diff --git a/pyproject.toml b/pyproject.toml index 4bda7566e..8fd41a22c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -98,8 +98,10 @@ murfey = "murfey.client:run" [project.entry-points."murfey.config.extraction"] "murfey_machine" = "murfey.util.config:get_extended_machine_config" [project.entry-points."murfey.workflows"] -"lif_to_stack" = "murfey.workflows.lif_to_stack:zocalo_cluster_request" -"tiff_to_stack" = "murfey.workflows.tiff_to_stack:zocalo_cluster_request" +"process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" +"process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" +"register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" +"register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" [tool.setuptools] package-dir = {"" = "src"} diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index 2ce45ad27..00cec7dc6 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -386,7 +386,7 @@ def process_lif_file( try: # Construct the URL to post the request to - url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/lif_to_stack?lif_file={quote(str(lif_file), safe='')}" + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/process_raw_lifs?lif_file={quote(str(lif_file), safe='')}" # Validate if not url: logger.error( @@ -442,7 +442,7 @@ def process_tiff_series( try: # Construct URL for Murfey server to communicate with - url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/tiff_to_stack" + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/process_raw_tiffs" if not url: logger.error( "URL could not be constructed from the environment and file path" diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index be4cc10c3..ae7abcf81 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -56,6 +56,9 @@ from murfey.server.ispyb import TransportManager # Session except AttributeError: pass +from backports.entry_points_selectable import entry_points +from importlib_metadata import EntryPoint # For type hinting only + import murfey.util.db as db from murfey.util import LogFilter from murfey.util.spa_params import default_spa_parameters @@ -2964,6 +2967,26 @@ def feedback_callback(header: dict, message: dict) -> None: if _transport_object: _transport_object.transport.ack(header) return None + elif ( + message["register"] in entry_points().select(group="murfey.workflows").names + ): + # Run the workflow if a match is found + workflow: EntryPoint = list( # Returns a list of either 1 or 0 + entry_points().select( + group="murfey.workflows", name=message["register"] + ) + )[0] + result = workflow.load()( + message=message, + db=murfey_db, + ) + if _transport_object: + if result: + _transport_object.transport.ack(header) + else: + # Send it directly to DLQ without trying to rerun it + _transport_object.transport.nack(header, requeue=False) + return None if _transport_object: _transport_object.transport.nack(header, requeue=False) return None diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index bcbdde134..0f948c36f 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -1,12 +1,13 @@ from __future__ import annotations import re -import sys import traceback +from importlib.metadata import EntryPoint # type hinting only from logging import getLogger from pathlib import Path from typing import Optional, Type, Union +from backports.entry_points_selectable import entry_points from fastapi import APIRouter from sqlalchemy.exc import NoResultFound from sqlmodel import Session, select @@ -22,13 +23,7 @@ CLEMTIFFFile, ) from murfey.util.db import Session as MurfeySession -from murfey.util.models import TiffSeriesInfo - -# Use backport from importlib_metadata for Python <3.10 -if sys.version_info.major == 3 and sys.version_info.minor < 10: - from importlib_metadata import EntryPoint, entry_points -else: - from importlib.metadata import EntryPoint, entry_points +from murfey.util.models import TIFFSeriesInfo # Set up logger logger = getLogger("murfey.server.api.clem") @@ -81,23 +76,15 @@ def validate_and_sanitise( machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] - rsync_basepath = machine_config.rsync_basepath - try: - base_path = list(rsync_basepath.parents)[-2].as_posix() - except IndexError: - # Print to troubleshoot - logger.warning(f"Base path {rsync_basepath!r} is too short") - base_path = rsync_basepath.as_posix() - except Exception: - raise Exception("Unexpected exception occurred when loading the file base path") + base_path = machine_config.rsync_basepath.as_posix() # Check that full file path doesn't contain unallowed characters - # Currently allows only: - # - words (alphanumerics and "_"; \w), - # - spaces (\s), - # - periods, - # - dashes, - # - forward slashes ("/") + # Currently allows only: + # - words (alphanumerics and "_"; \w), + # - spaces (\s), + # - periods, + # - dashes, + # - forward slashes ("/") if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False: raise ValueError(f"Unallowed characters present in {file}") @@ -631,51 +618,68 @@ def register_image_stack( """ -@router.post("/sessions/{session_id}/lif_to_stack") # API posts to this URL -def lif_to_stack( +@router.post( + "/sessions/{session_id}/clem/preprocessing/process_raw_lifs" +) # API posts to this URL +def process_raw_lifs( session_id: int, # Used by the decorator lif_file: Path, + db: Session = murfey_db, ): - # Get command line entry point - murfey_workflows = entry_points().select( - group="murfey.workflows", name="lif_to_stack" - ) - - # Use entry point if found - if len(murfey_workflows) == 1: - workflow: EntryPoint = list(murfey_workflows)[0] - workflow.load()( - # Match the arguments found in murfey.workflows.lif_to_stack - file=lif_file, - root_folder="images", - messenger=_transport_object, - ) - return True - # Raise error if Murfey workflow not found - else: + try: + # Try and load relevant Murfey workflow + workflow: EntryPoint = list( + entry_points().select(group="murfey.workflows", name="process_raw_lifs") + )[0] + except IndexError: raise RuntimeError("The relevant Murfey workflow was not found") + # Get instrument name from the database to load the correct config file + session_row: MurfeySession = db.exec( + select(MurfeySession).where(MurfeySession.id == session_id) + ).one() + instrument_name = session_row.instrument_name + + # Pass arguments along to the correct workflow + workflow.load()( + # Match the arguments found in murfey.workflows.clem.process_raw_lifs + file=lif_file, + root_folder="images", + session_id=session_id, + instrument_name=instrument_name, + messenger=_transport_object, + ) + return True + -@router.post("/sessions/{session_id}/tiff_to_stack") -def tiff_to_stack( +@router.post("/sessions/{session_id}/clem/preprocessing/process_raw_tiffs") +def process_raw_tiffs( session_id: int, # Used by the decorator - tiff_info: TiffSeriesInfo, + tiff_info: TIFFSeriesInfo, + db: Session = murfey_db, ): - # Get command line entry point - murfey_workflows = entry_points().select( - group="murfey.workflows", name="tiff_to_stack" - ) - - # Use entry point if found - if murfey_workflows: - workflow: EntryPoint = list(murfey_workflows)[0] - workflow.load()( - # Match the arguments found in murfey.workflows.tiff_to_stack - file=tiff_info.tiff_files[0], # Pass it only one file from the list - root_folder="images", - metadata=tiff_info.series_metadata, - messenger=_transport_object, - ) - # Raise error if Murfey workflow not found - else: + try: + # Try and load relevant Murfey workflow + workflow: EntryPoint = list( + entry_points().select(group="murfey.workflows", name="process_raw_tiffs") + )[0] + except IndexError: raise RuntimeError("The relevant Murfey workflow was not found") + + # Get instrument name from the database to load the correct config file + session_row: MurfeySession = db.exec( + select(MurfeySession).where(MurfeySession.id == session_id) + ).one() + instrument_name = session_row.instrument_name + + # Pass arguments to correct workflow + workflow.load()( + # Match the arguments found in murfey.workflows.clem.process_raw_tiffs + tiff_list=tiff_info.tiff_files, + root_folder="images", + session_id=session_id, + instrument_name=instrument_name, + metadata=tiff_info.series_metadata, + messenger=_transport_object, + ) + return True diff --git a/src/murfey/server/murfey_db.py b/src/murfey/server/murfey_db.py index e47c55902..2d0d52cf7 100644 --- a/src/murfey/server/murfey_db.py +++ b/src/murfey/server/murfey_db.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os from functools import partial import yaml @@ -11,8 +10,6 @@ from murfey.util.config import Security, get_security_config -instrument_name = os.getenv("BEAMLINE", "") - def url(security_config: Security | None = None) -> str: security_config = security_config or get_security_config() diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index e18fe18bd..bca698f66 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -269,9 +269,12 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore ) # One to many # Process checklist for series - images_aligned: bool = False # Image stacks aligned to reference image - rgbs_created: bool = False # Image stacks all colorised - composite_created: bool = False # Composite flattened image created + number_of_members: int = ( + 0 # Expected number of image stacks belonging to this series + ) + images_aligned: bool = False # Have all members been aligned? + rgbs_created: bool = False # Have all members been colourised? + composite_created: bool = False # Has a composite image been created? composite_image: Optional[str] = None # Full path to composite image diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index a01943c6f..7af87f754 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -1,10 +1,11 @@ from __future__ import annotations +from ast import literal_eval from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, validator """ General Models @@ -154,12 +155,46 @@ class FractionationParameters(BaseModel): """ -class TiffSeriesInfo(BaseModel): +class TIFFSeriesInfo(BaseModel): series_name: str tiff_files: List[Path] series_metadata: Path +class LIFPreprocessingResult(BaseModel): + image_stack: Path + metadata: Path + series_name: str + channel: str + number_of_members: int + parent_lif: Path + + +class TIFFPreprocessingResult(BaseModel): + image_stack: Path + metadata: Path + series_name: str + channel: str + number_of_members: int + parent_tiffs: list[Path] + + @validator( + "parent_tiffs", + pre=True, + ) + def parse_stringified_list(cls, value): + if isinstance(value, str): + try: + eval_result = literal_eval(value) + if isinstance(eval_result, list): + parent_tiffs = [Path(p) for p in eval_result] + return parent_tiffs + except (SyntaxError, ValueError): + raise ValueError("Unable to parse input") + # Return value as-is; if it fails, it fails + return value + + """ FIB === diff --git a/src/murfey/workflows/clem/__init__.py b/src/murfey/workflows/clem/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/murfey/workflows/clem/process_raw_lifs.py b/src/murfey/workflows/clem/process_raw_lifs.py new file mode 100644 index 000000000..d02e6ac1e --- /dev/null +++ b/src/murfey/workflows/clem/process_raw_lifs.py @@ -0,0 +1,70 @@ +""" +Script to allow Murfey to submit the LIF-to-STACK job to the cluster. +The recipe referred to here is stored on GitLab. +""" + +from pathlib import Path +from typing import Optional + +from murfey.util.config import get_machine_config + +try: + from murfey.server.ispyb import TransportManager # Session +except AttributeError: + pass # Ignore if ISPyB credentials environment variable not set + + +def zocalo_cluster_request( + file: Path, + root_folder: str, + session_id: int, # Provided by the client via the API endpoint + instrument_name: str, # Acquired by looking up the Session table + messenger: Optional[TransportManager] = None, +): + if messenger: + # Use file path parts to construct parameters + path_parts = list((file.parent / file.stem).parts) + # Replace leading "/" in Unix paths + path_parts[0] = "" if path_parts[0] == "/" else path_parts[0] + try: + # Find the position of the root folder in the list + root_index = [p.lower() for p in path_parts].index(root_folder.lower()) + except ValueError: + raise Exception( + f"Unable to find the root folder {root_folder!r} in the file path {file!r}" + ) + + # Construct the session and job name + session_dir = "/".join(path_parts[:root_index]) + job_name = "--".join( + [p.replace(" ", "_") if " " in p else p for p in path_parts][ + root_index + 1 : + ] + ) + + # Load machine config to get the feedback queue + machine_config = get_machine_config() + feedback_queue = machine_config[instrument_name].feedback_queue + + # Send the message + # The keys under "parameters" will populate all the matching fields in {} + # in the processing recipe + messenger.send( + "processing_recipe", + { + "recipes": ["clem-lif-to-stack"], + "parameters": { + # Job parameters + "lif_file": f"{str(file)}", + "root_folder": root_folder, + # Other recipe parameters + "session_dir": f"{str(session_dir)}", + "session_id": session_id, + "job_name": job_name, + "feedback_queue": feedback_queue, + }, + }, + new_connection=True, + ) + else: + raise Exception("Unable to find transport manager") diff --git a/src/murfey/workflows/clem/process_raw_tiffs.py b/src/murfey/workflows/clem/process_raw_tiffs.py new file mode 100644 index 000000000..c72ff3c26 --- /dev/null +++ b/src/murfey/workflows/clem/process_raw_tiffs.py @@ -0,0 +1,74 @@ +""" +Script to allow Murfey to submit the TIFF-to-stack job to the cluster. +The recipe referred to here is stored on GitLab. +""" + +from pathlib import Path +from typing import Optional + +from murfey.util.config import get_machine_config + +try: + from murfey.server.ispyb import TransportManager # Session +except AttributeError: + pass # Ignore if ISPyB credentials environment variable not set + + +def zocalo_cluster_request( + tiff_list: list[Path], + root_folder: str, + session_id: int, + instrument_name: str, + metadata: Optional[Path] = None, + messenger: Optional[TransportManager] = None, +): + if messenger: + # Construct path to session directory + path_parts = list(tiff_list[0].parts) + # Replace leading "/" in Unix paths + path_parts[0] = "" if path_parts[0] == "/" else path_parts[0] + try: + # Find the position of the root folder in the list + root_index = [p.lower() for p in path_parts].index(root_folder.lower()) + except ValueError: + raise Exception( + f"Unable to find the root folder {root_folder!r} in the file path {tiff_list[0]!r}" + ) + # Construct the session and job name + session_dir = "/".join(path_parts[:root_index]) + job_name = "--".join( + [p.replace(" ", "_") if " " in p else p for p in path_parts][ + root_index + 1 : + ] + ) + + # If no metadata file provided, generate path to one + if metadata is None: + series_name = tiff_list[0].stem.split("--")[0] + metadata = tiff_list[0].parent / "Metadata" / (series_name + ".xlif") + + # Load machine config to get the feedback queue + machine_config = get_machine_config() + feedback_queue = machine_config[instrument_name].feedback_queue + + messenger.send( + "processing_recipe", + { + "recipes": ["clem-tiff-to-stack"], + "parameters": { + # Job parameters + "tiff_list": "null", + "tiff_file": f"{str(tiff_list[0])}", + "root_folder": root_folder, + "metadata": f"{str(metadata)}", + # Other recipe parameters + "session_dir": f"{str(session_dir)}", + "session_id": session_id, + "job_name": job_name, + "feedback_queue": feedback_queue, + }, + }, + new_connection=True, + ) + else: + raise Exception("Unable to find transport manager") diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py new file mode 100644 index 000000000..bb111f23e --- /dev/null +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -0,0 +1,397 @@ +""" +Functions to process the requests received by Murfey related to the CLEM workflow. + +The CLEM-related file registration API endpoints can eventually be moved here, since +the file registration processes all take place on the server side only. +""" + +from __future__ import annotations + +import json +import logging +import re +import traceback +from pathlib import Path +from typing import Optional, Type, Union + +from sqlalchemy.exc import NoResultFound +from sqlmodel import Session, select + +from murfey.util.config import get_machine_config +from murfey.util.db import ( + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, +) +from murfey.util.db import Session as MurfeySession +from murfey.util.models import LIFPreprocessingResult, TIFFPreprocessingResult + +logger = logging.getLogger("murfey.workflows.clem.register_results") + + +def _validate_and_sanitise( + file: Path, + session_id: int, + db: Session, +) -> Path: + """ + Performs validation and sanitisation on the incoming file paths, ensuring that + no forbidden characters are present and that the the path points only to allowed + sections of the file server. + + Returns the file path as a sanitised string that can be converted into a Path + object again. + + NOTE: Due to the instrument name query, 'db' now needs to be passed as an + explicit variable to this function from within a FastAPI endpoint, as using the + instance that was imported directly won't load it in the correct state. + """ + + valid_file_types = ( + ".lif", + ".tif", + ".tiff", + ".xlif", + ".xml", + ) + + # Resolve symlinks and directory changes to get full file path + full_path = Path(file).resolve() + + # Use machine configuration to validate which file base paths are accepted from + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + rsync_basepath = machine_config.rsync_basepath + try: + base_path = list(rsync_basepath.parents)[-2].as_posix() + except IndexError: + logger.warning(f"Base path {rsync_basepath!r} is too short") + base_path = rsync_basepath.as_posix() + except Exception as e: + raise Exception( + f"Unexpected exception encountered when loading the file base path: {e}" + ) + + # Check that full file path doesn't contain unallowed characters + # Currently allows only: + # - words (alphanumerics and "_"; \w), + # - spaces (\s), + # - periods, + # - dashes, + # - forward slashes ("/") + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False: + raise ValueError(f"Unallowed characters present in {file}") + + # Check that it's not accessing somehwere it's not allowed + if not str(full_path).startswith(str(base_path)): + raise ValueError(f"{file} points to a directory that is not permitted") + + # Check that it's a file, not a directory + if full_path.is_file() is False: + raise ValueError(f"{file} is not a file") + + # Check that it is of a permitted file type + if f"{full_path.suffix}" not in valid_file_types: + raise ValueError(f"{full_path.suffix} is not a permitted file format") + + return full_path + + +def get_db_entry( + db: Session, + # With the database search funcion having been moved out of the FastAPI + # endpoint, the database now has to be explicitly passed within the FastAPI + # endpoint function in order for it to be loaded in the correct state. + table: Type[ + Union[ + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, + ] + ], + session_id: int, + file_path: Optional[Path] = None, + series_name: Optional[str] = None, +) -> Union[ + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, +]: + """ + Searches the CLEM workflow-related tables in the Murfey database for an entry that + matches the file path or series name within a given session. Returns the entry if + a match is found, otherwise register it as a new entry in the database. + """ + + # Validate that parameters are provided correctly + if file_path is None and series_name is None: + raise ValueError( + "One of either 'file_path' or 'series_name' has to be provided" + ) + if file_path is not None and series_name is not None: + raise ValueError("Only one of 'file_path' or 'series_name' should be provided") + + # Validate file path if provided + if file_path is not None: + try: + file_path = _validate_and_sanitise(file_path, session_id, db) + except Exception: + raise Exception + + # Validate series name to use + if series_name is not None: + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", series_name)) is False: + raise ValueError("One or more characters in the string are not permitted") + + # Return database entry if it exists + try: + db_entry = ( + db.exec( + select(table) + .where(table.session_id == session_id) + .where(table.file_path == str(file_path)) + ).one() + if file_path is not None + else db.exec( + select(table) + .where(table.session_id == session_id) + .where(table.series_name == series_name) + ).one() + ) + # Create and register new entry if not present + except NoResultFound: + db_entry = ( + table( + file_path=str(file_path), + session_id=session_id, + ) + if file_path is not None + else table( + series_name=series_name, + session_id=session_id, + ) + ) + db.add(db_entry) + db.commit() + db.refresh(db_entry) + except Exception: + raise Exception + + return db_entry + + +def register_lif_preprocessing_result( + message: dict, db: Session, demo: bool = False +) -> bool: + """ + session_id (recipe) + register (wrapper) + result (wrapper) + key1 + key2 + ... + """ + + session_id: int = ( + int(message["session_id"]) + if not isinstance(message["session_id"], int) + else message["session_id"] + ) + + # Validate message and try and load results + if isinstance(message["result"], str): + try: + json_obj: dict = json.loads(message["result"]) + result = LIFPreprocessingResult(**json_obj) + except Exception: + logger.error(traceback.format_exc()) + logger.error("Exception encountered when parsing LIF preprocessing result") + return False + elif isinstance(message["result"], dict): + try: + result = LIFPreprocessingResult(**message["result"]) + except Exception: + logger.error(traceback.format_exc()) + logger.error("Exception encountered when parsing LIF preprocessing result") + return False + else: + logger.error( + f"Invalid type for LIF preprocessing result: {type(message['result'])}" + ) + return False + + # Register items in database if not already present + try: + clem_img_stk: CLEMImageStack = get_db_entry( + db=db, + table=CLEMImageStack, + session_id=session_id, + file_path=result.image_stack, + ) + + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + + clem_metadata: CLEMImageMetadata = get_db_entry( + db=db, + table=CLEMImageMetadata, + session_id=session_id, + file_path=result.metadata, + ) + + clem_lif_file: CLEMLIFFile = get_db_entry( + db=db, + table=CLEMLIFFile, + session_id=session_id, + file_path=result.parent_lif, + ) + + # Link tables to one another and populate fields + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_lif = clem_lif_file + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = result.channel + clem_img_stk.stack_created = True + db.add(clem_img_stk) + db.commit() + db.refresh(clem_img_stk) + + clem_img_series.associated_metadata = clem_metadata + clem_img_series.parent_lif = clem_lif_file + clem_img_series.number_of_members = result.number_of_members + db.add(clem_img_series) + db.commit() + db.refresh(clem_img_series) + + clem_metadata.parent_lif = clem_lif_file + db.add(clem_metadata) + db.commit() + db.refresh(clem_metadata) + + logger.info( + f"LIF preprocessing results registered for {result.series_name!r} {result.channel!r} image stack" + ) + return True + + except Exception: + logger.error(traceback.format_exc()) + logger.error( + f"Exception encountered when registering LIF preprocessing result for {result.series_name!r} {result.channel!r} image stack" + ) + return False + + finally: + db.close() + + +def register_tiff_preprocessing_result( + message: dict, db: Session, demo: bool = False +) -> bool: + + session_id: int = ( + int(message["session_id"]) + if not isinstance(message["session_id"], int) + else message["session_id"] + ) + if isinstance(message["result"], str): + try: + json_obj: dict = json.loads(message["result"]) + result = TIFFPreprocessingResult(**json_obj) + except Exception: + logger.error(traceback.format_exc()) + logger.error("Exception encountered when parsing TIFF preprocessing result") + return False + elif isinstance(message["result"], dict): + try: + result = TIFFPreprocessingResult(**message["result"]) + except Exception: + logger.error(traceback.format_exc()) + logger.error("Exception encountered when parsing TIFF preprocessing result") + return False + else: + logger.error( + f"Invalid type for TIFF preprocessing result: {type(message['result'])}" + ) + return False + + # Register items in database if not already present + try: + clem_img_stk: CLEMImageStack = get_db_entry( + db=db, + table=CLEMImageStack, + session_id=session_id, + file_path=result.image_stack, + ) + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_metadata: CLEMImageMetadata = get_db_entry( + db=db, + table=CLEMImageMetadata, + session_id=session_id, + file_path=result.metadata, + ) + + # Link tables to one another and populate fields + # Register TIFF files and populate them iteratively first + for file in result.parent_tiffs: + clem_tiff_file: CLEMTIFFFile = get_db_entry( + db=db, + table=CLEMTIFFFile, + session_id=session_id, + file_path=file, + ) + clem_tiff_file.associated_metadata = clem_metadata + clem_tiff_file.child_series = clem_img_series + clem_tiff_file.child_stack = clem_img_stk + db.add(clem_tiff_file) + db.commit() + db.refresh(clem_tiff_file) + + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = result.channel + clem_img_stk.stack_created = True + db.add(clem_img_stk) + db.commit() + db.refresh(clem_img_stk) + + clem_img_series.associated_metadata = clem_metadata + clem_img_series.number_of_members = result.number_of_members + db.add(clem_img_series) + db.commit() + db.refresh(clem_img_series) + + logger.info( + f"TIFF preprocessing results registered for {result.series_name!r} {result.channel!r} image stack" + ) + return True + + except Exception: + logger.error(traceback.format_exc()) + logger.error( + f"Exception encountered when registering TIFF preprocessing result for {result.series_name!r} {result.channel!r} image stack" + ) + return False + + finally: + db.close() diff --git a/src/murfey/workflows/lif_to_stack.py b/src/murfey/workflows/lif_to_stack.py deleted file mode 100644 index 5cae568f6..000000000 --- a/src/murfey/workflows/lif_to_stack.py +++ /dev/null @@ -1,48 +0,0 @@ -""" -Script to allow Murfey to submit the LIF-to-STACK job to the cluster. -The recipe referred to here is stored on GitLab. -""" - -from pathlib import Path -from typing import Optional - -try: - from murfey.server.ispyb import TransportManager # Session -except AttributeError: - pass # Ignore if ISPyB credentials environment variable not set - - -def zocalo_cluster_request( - file: Path, - root_folder: str, - messenger: Optional[TransportManager] = None, -): - if messenger: - # Construct path to session directory - path_parts = list(file.parts) - new_path = [] - for p in range(len(path_parts)): - part = path_parts[p] - # Remove leading slash for subsequent rejoining - if part == "/": - part = "" - # Append up to, but not including, root folder - if part.lower() == root_folder.lower(): - break - new_path.append(part) - session_dir = Path("/".join(new_path)) - - messenger.send( - "processing_recipe", - { - "recipes": ["clem-lif-to-stack"], - "parameters": { - "session_dir": str(session_dir), - "lif_path": str(file), - "root_dir": root_folder, - }, - }, - new_connection=True, - ) - else: - raise Exception("Unable to find transport manager") diff --git a/src/murfey/workflows/tiff_to_stack.py b/src/murfey/workflows/tiff_to_stack.py deleted file mode 100644 index 98fa4f456..000000000 --- a/src/murfey/workflows/tiff_to_stack.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -Script to allow Murfey to submit the TIFF-to-stack job to the cluster. -The recipe referred to here is stored on GitLab. -""" - -from pathlib import Path -from typing import Optional - -try: - from murfey.server.ispyb import TransportManager # Session -except AttributeError: - pass # Ignore if ISPyB credentials environment variable not set - - -def zocalo_cluster_request( - file: Path, - root_folder: str, - metadata: Optional[Path] = None, - messenger: Optional[TransportManager] = None, -): - if messenger: - # Construct path to session directory - path_parts = list(file.parts) - new_path = [] - for p in range(len(path_parts)): - part = path_parts[p] - # Remove leading slash for subsequent rejoining - if part == "/": - part = "" - # Append up to, but not including, root folder - if part.lower() == root_folder.lower(): - break - new_path.append(part) - session_dir = Path("/".join(new_path)) - - # If no metadata file provided, generate path to one - if metadata is None: - series_name = file.stem.split("--")[0] - metadata = file.parent / "Metadata" / (series_name + ".xlif") - - messenger.send( - "processing_recipe", - { - "recipes": ["clem-tiff-to-stack"], - "parameters": { - "session_dir": str(session_dir), - "tiff_path": str(file), - "root_dir": root_folder, - "metadata": str(metadata), - }, - }, - new_connection=True, - ) - else: - raise Exception("Unable to find transport manager")