From 7b50bb6dfde2bf1a0b06eff1c41863591fb1dbc5 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 2 Oct 2024 19:16:20 +0100 Subject: [PATCH 01/48] Used Optional[] as type hint for Python 3.9 compatibility --- src/murfey/server/demo_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/server/demo_api.py b/src/murfey/server/demo_api.py index b032c8154..7f209d361 100644 --- a/src/murfey/server/demo_api.py +++ b/src/murfey/server/demo_api.py @@ -142,7 +142,7 @@ def machine_info() -> Optional[MachineConfig]: @lru_cache(maxsize=5) @router.get("/instruments/{instrument_name}/machine") -def machine_info_by_name(instrument_name: str) -> MachineConfig | None: +def machine_info_by_name(instrument_name: str) -> Optional[MachineConfig]: if settings.murfey_machine_configuration: return from_file(Path(settings.murfey_machine_configuration), instrument_name)[ instrument_name From e796663ac6b2b378e484e7bdfddc5b7b1f382c36 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 2 Oct 2024 19:18:00 +0100 Subject: [PATCH 02/48] Raise Exception if no TransportManager instance is found when running this Murfey workflow --- src/murfey/workflows/lif_to_stack.py | 2 ++ src/murfey/workflows/tiff_to_stack.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/murfey/workflows/lif_to_stack.py b/src/murfey/workflows/lif_to_stack.py index a862edaf3..5cae568f6 100644 --- a/src/murfey/workflows/lif_to_stack.py +++ b/src/murfey/workflows/lif_to_stack.py @@ -44,3 +44,5 @@ def zocalo_cluster_request( }, new_connection=True, ) + else: + raise Exception("Unable to find transport manager") diff --git a/src/murfey/workflows/tiff_to_stack.py b/src/murfey/workflows/tiff_to_stack.py index 44073fed0..98fa4f456 100644 --- a/src/murfey/workflows/tiff_to_stack.py +++ b/src/murfey/workflows/tiff_to_stack.py @@ -51,3 +51,5 @@ def zocalo_cluster_request( }, new_connection=True, ) + else: + raise Exception("Unable to find transport manager") From 0167f962c187f3cefcfbfa47526a4ec97fab0ee9 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 2 Oct 2024 19:22:11 +0100 Subject: [PATCH 03/48] Corrected database call in 'validate_and_sanitise' function; simplified LIF file registration and processing parameters; corrected 'murfey_workflow' call --- src/murfey/server/api/clem.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index cd408ac60..96e19e102 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -23,13 +23,13 @@ CLEMTIFFFile, ) from murfey.util.db import Session as MurfeySession -from murfey.util.models import LifFileInfo, TiffSeriesInfo +from murfey.util.models import TiffSeriesInfo # Use backport from importlib_metadata for Python <3.10 if sys.version_info.major == 3 and sys.version_info.minor < 10: - from importlib_metadata import entry_points + from importlib_metadata import EntryPoint, entry_points else: - from importlib.metadata import entry_points + from importlib.metadata import EntryPoint, entry_points # Set up logger logger = getLogger("murfey.server.api.clem") @@ -52,7 +52,11 @@ """ -def validate_and_sanitise(file: Path, session_id: int) -> Path: +def validate_and_sanitise( + file: Path, + session_id: int, + db: Session, +) -> Path: """ Performs validation and sanitisation on the incoming file paths, ensuring that no forbidden characters are present and that the the path points only to allowed @@ -60,13 +64,17 @@ def validate_and_sanitise(file: Path, session_id: int) -> Path: Returns the file path as a sanitised string that can be converted into a Path object again. + + NOTE: Due to the instrument name query, 'db' now needs to be passed as an + explicit variable to this function from within a FastAPI endpoint, as using the + instance that was imported directly won't load it in the correct state. """ # Resolve symlinks and directory changes to get full file path full_path = path.normpath(path.realpath(file)) instrument_name = ( - murfey_db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) .one() .instrument_name ) @@ -144,7 +152,7 @@ def get_db_entry( # Validate file path if provided if file_path is not None: try: - file_path = validate_and_sanitise(file_path, session_id) + file_path = validate_and_sanitise(file_path, session_id, db) except Exception: raise Exception @@ -220,7 +228,7 @@ def register_lif_file( # Add metadata information if provided if master_metadata is not None: try: - master_metadata = validate_and_sanitise(master_metadata, session_id) + master_metadata = validate_and_sanitise(master_metadata, session_id, db) clem_lif_file.master_metadata = str(master_metadata) except Exception: logger.warning(traceback.format_exc()) @@ -621,7 +629,7 @@ def register_image_stack( @router.post("/sessions/{session_id}/lif_to_stack") # API posts to this URL def lif_to_stack( session_id: int, # Used by the decorator - lif_info: LifFileInfo, + lif_file: Path, ): # Get command line entry point murfey_workflows = entry_points().select( @@ -629,13 +637,15 @@ def lif_to_stack( ) # Use entry point if found - if murfey_workflows: - murfey_workflows[0].load()( + if len(murfey_workflows) == 1: + workflow: EntryPoint = list(murfey_workflows)[0] + workflow.load()( # Match the arguments found in murfey.workflows.lif_to_stack - file=lif_info.name, + file=lif_file, root_folder="images", messenger=_transport_object, ) + return True # Raise error if Murfey workflow not found else: raise RuntimeError("The relevant Murfey workflow was not found") @@ -653,7 +663,8 @@ def tiff_to_stack( # Use entry point if found if murfey_workflows: - murfey_workflows[0].load()( + workflow: EntryPoint = list(murfey_workflows)[0] + workflow.load()( # Match the arguments found in murfey.workflows.tiff_to_stack file=tiff_info.tiff_files[0], # Pass it only one file from the list root_folder="images", From 6dac7042005d8928a6a8f0d88e1f4696d9632431 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 2 Oct 2024 19:29:51 +0100 Subject: [PATCH 04/48] Added functions to post requests to the server to register LIF and TIFF files; packaged posts as functions --- src/murfey/client/contexts/clem.py | 217 ++++++++++++++++++++++------- 1 file changed, 168 insertions(+), 49 deletions(-) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index 3688ac5e0..07273d3fb 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -7,6 +7,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, Generator, List, Optional +from urllib.parse import quote from xml.etree import ElementTree as ET from defusedxml.ElementTree import parse @@ -128,13 +129,13 @@ def post_transfer( logger.warning(f"No source found for file {transferred_file}") return False - # Get the Path on the DLS file system - file_path = _file_transferred_to( + # Get the file Path at the destination + destination_file = _file_transferred_to( environment=environment, source=source, file_path=transferred_file, ) - if not file_path: + if not destination_file: logger.warning( f"File {transferred_file.name!r} not found on the storage system" ) @@ -167,16 +168,22 @@ def post_transfer( if len(transferred_file.stem.split("--")) == 3: series_name = "/".join( [ - *file_path.parent.parts[-2:], # Upper 2 parent directories - file_path.stem.split("--")[0], + *destination_file.parent.parts[ + -2: + ], # Upper 2 parent directories + destination_file.stem.split("--")[0], ] ) # When this a repeated position elif len(transferred_file.stem.split("--")) == 4: series_name = "/".join( [ - *file_path.parent.parts[-2:], # Upper 2 parent directories - "--".join(file_path.stem.split("--")[i] for i in [0, -1]), + *destination_file.parent.parts[ + -2: + ], # Upper 2 parent directories + "--".join( + destination_file.stem.split("--")[i] for i in [0, -1] + ), ] ) else: @@ -196,7 +203,7 @@ def post_transfer( if series_name not in self._tiff_timestamps.keys(): self._tiff_timestamps[series_name] = [] # Append information to list - self._tiff_series[series_name].append(str(file_path)) + self._tiff_series[series_name].append(str(destination_file)) self._tiff_sizes[series_name].append(transferred_file.stat().st_size) self._tiff_timestamps[series_name].append( transferred_file.stat().st_ctime @@ -205,6 +212,11 @@ def post_transfer( f"Created TIFF file dictionary entries for {series_name!r}" ) + # Register the TIFF file in the database + post_result = self.register_tiff_file(destination_file, environment) + if post_result is False: + return False + # Process XLIF files if transferred_file.suffix == ".xlif": @@ -230,7 +242,7 @@ def post_transfer( # XLIF files don't have the "--ZXX--CXX" additions in the file name # But they have "/Metadata/" as the immediate parent series_name = "/".join( - [*file_path.parent.parent.parts[-2:], file_path.stem] + [*destination_file.parent.parent.parts[-2:], destination_file.stem] ) # The previous 2 parent directories should be unique enough logger.debug( f"File {transferred_file.name!r} given the series name {series_name!r}" @@ -262,11 +274,14 @@ def post_transfer( # Update dictionary entries self._files_in_series[series_name] = num_files - self._series_metadata[series_name] = str(file_path) + self._series_metadata[series_name] = str(destination_file) self._metadata_size[series_name] = transferred_file.stat().st_size self._metadata_timestamp[series_name] = transferred_file.stat().st_ctime logger.debug(f"Created dictionary entries for {series_name!r} metadata") + # A new copy of the metadata file is created in 'processed', so no need + # to register this instance of it + # Post message if all files for the associated series have been collected # .get(series_name, 0) returns 0 if no associated key is found if not len(self._tiff_series.get(series_name, [])): @@ -284,26 +299,20 @@ def post_transfer( f"Collected expected number of TIFF files for series {series_name!r}; posting job to server" ) - # Construct URL for Murfey server to communicate with - url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/tiff_to_stack" - if not url: - logger.warning("No URL found for the environment") - return True - # Post the message and log any errors that arise - capture_post( - url, - json={ - "series_name": series_name, - "tiff_files": self._tiff_series[series_name], - "tiff_sizes": self._tiff_sizes[series_name], - "tiff_timestamps": self._tiff_timestamps[series_name], - "series_metadata": self._series_metadata[series_name], - "metadata_size": self._metadata_size[series_name], - "metadata_timestamp": self._metadata_timestamp[series_name], - "description": "", - }, - ) + tiff_dataset = { + "series_name": series_name, + "tiff_files": self._tiff_series[series_name], + "tiff_sizes": self._tiff_sizes[series_name], + "tiff_timestamps": self._tiff_timestamps[series_name], + "series_metadata": self._series_metadata[series_name], + "metadata_size": self._metadata_size[series_name], + "metadata_timestamp": self._metadata_timestamp[series_name], + "description": "", + } + post_result = self.process_tiff_series(tiff_dataset, environment) + if post_result is False: + return False return True else: logger.debug(f"TIFF series {series_name!r} is still being processed") @@ -323,32 +332,142 @@ def post_transfer( return True logger.debug( - f"File {transferred_file.name!r} is a valid LIF file; posting job to server" + f"File {transferred_file.name!r} is a valid LIF file; starting processing" ) - # Construct the URL for the Murfey server to communicate with - url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/lif_to_stack" - # Type checking to satisfy MyPy - if not url: - logger.warning("No URL found for the environment") - return True - - # Get the Path on the DLS file system - file_path = _file_transferred_to( + # Get the Path at the destination + destination_file = _file_transferred_to( environment=environment, source=source, file_path=transferred_file, ) + if not destination_file: + logger.warning( + f"File {transferred_file.name!r} not found on the storage system" + ) + return False + + # Post URL to register LIF file in database + post_result = self.register_lif_file(destination_file, environment) + if post_result is False: + return False + logger.debug(f"Registered {destination_file.name!r} in the database") + + # Post URL to trigger job and convert LIF file into image stacks + post_result = self.process_lif_file(destination_file, environment) + if post_result is False: + return False + logger.debug(f"Started preprocessing of {destination_file.name!r}") - # Post the message and logs it if there's an error - capture_post( - url, - json={ - "name": str(file_path), - "size": transferred_file.stat().st_size, # File size, in bytes - "timestamp": transferred_file.stat().st_ctime, # For Unix systems, shows last metadata change - "description": "", - }, - ) return True return True + + def register_lif_file( + self, + lif_file: Path, + environment: MurfeyInstanceEnvironment, + ): + """ + Constructs the URL and dictionary to be posted to the server, which will then + register the LIF file in the database correctly as part of the CLEM workflow. + """ + try: + # Construct URL to post to post the request to + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/lif_files?lif_file={quote(str(lif_file), safe='')}" + # Validate + if not url: + logger.error( + "URL could not be constructed from the environment and file path" + ) + return ValueError + + # Send the message + capture_post(url) + return True + + except Exception: + logger.error( + "Error encountered when registering the LIF file in the database" + ) + return False + + def process_lif_file( + self, + lif_file: Path, + environment: MurfeyInstanceEnvironment, + ): + """ + Constructs the URL and dictionary to be posted to the server, which will then + trigger the preprocessing of the LIF file. + """ + + try: + # Construct the URL to post the request to + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/lif_to_stack?lif_file={quote(str(lif_file), safe='')}" + # Validate + if not url: + logger.error( + "URL could not be constructed from the environment and file path" + ) + return ValueError + + # Send the message + capture_post(url) + return True + + except Exception: + logger.error("Error encountered processing LIF file") + return False + + def register_tiff_file( + self, + tiff_file: Path, + environment: MurfeyInstanceEnvironment, + ): + """ + Constructs the URL and dictionary to be posted to the server, which will then + register the TIFF file in the database correctly as part of the CLEM workflow. + """ + + try: + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/tiff_files?tiff_file={quote(str(tiff_file), safe='')}" + if not url: + logger.error( + "URL could not be constructed from the environment and file path" + ) + return ValueError + + # Send the message + capture_post(url) + return True + + except Exception: + logger.error( + "Error encountered when registering the TIFF file in the database" + ) + + def process_tiff_series( + self, + tiff_dataset: dict, + environment: MurfeyInstanceEnvironment, + ): + """ + Constructs the URL and dictionary to be posted to the server, which will then + trigger the preprocessing of this instance of a TIFF series. + """ + + try: + # Construct URL for Murfey server to communicate with + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/tiff_to_stack" + if not url: + logger.error( + "URL could not be constructed from the environment and file path" + ) + return ValueError + + # Send the message + capture_post(url, json=tiff_dataset) + return True + + except Exception: + logger.error("Error encountered processing the TIFF series") From bbd3e29e616651d606ffb72781597b182f78b6c0 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 3 Oct 2024 11:51:54 +0100 Subject: [PATCH 05/48] Used Path object to resolve file path in 'validate_and_sanitise'; added additional 'raise Exception' to catch unexpected ones when loading rsync_basepath --- src/murfey/server/api/clem.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 96e19e102..6f9095dac 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -4,7 +4,6 @@ import sys import traceback from logging import getLogger -from os import path from pathlib import Path from typing import Optional, Type, Union @@ -71,14 +70,14 @@ def validate_and_sanitise( """ # Resolve symlinks and directory changes to get full file path - full_path = path.normpath(path.realpath(file)) + full_path = Path(file).resolve() + # Use machine configuration to validate which file base paths are accepted from instrument_name = ( db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) .one() .instrument_name ) - # Use machine configuration to validate file paths used here machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] @@ -89,6 +88,8 @@ def validate_and_sanitise( # Print to troubleshoot logger.warning(f"Base path {rsync_basepath!r} is too short") base_path = rsync_basepath.as_posix() + except Exception: + raise Exception("Unexpected exception occurred when loading the file base path") # Check that full file path doesn't contain unallowed characters # Currently allows only: @@ -97,18 +98,22 @@ def validate_and_sanitise( # - periods, # - dashes, # - forward slashes ("/") - if bool(re.fullmatch(r"^[\w\s\.\-/]+$", full_path)) is False: - raise ValueError(f"Unallowed characters present in {file!r}") + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False: + raise ValueError(f"Unallowed characters present in {file}") # Check that it's not accessing somehwere it's not allowed if not str(full_path).startswith(str(base_path)): - raise ValueError(f"{file!r} points to a directory that is not permitted") + raise ValueError(f"{file} points to a directory that is not permitted") + + # Check that it's a file, not a directory + if full_path.is_file() is False: + raise ValueError(f"{file} is not a file") # Check that it is of a permitted file type - if f".{full_path.rsplit('.', 1)[-1]}" not in valid_file_types: - raise ValueError("File is not a permitted file format") + if f"{full_path.suffix}" not in valid_file_types: + raise ValueError(f"{full_path.suffix} is not a permitted file format") - return Path(full_path) + return full_path def get_db_entry( From b6a4297ff37d14f7a98fff346e1b737590a89fbc Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 9 Oct 2024 15:44:17 +0100 Subject: [PATCH 06/48] Corrected returned booleans at the end of different workflow branches --- src/murfey/client/contexts/clem.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index 07273d3fb..5b9eaa773 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -313,7 +313,7 @@ def post_transfer( post_result = self.process_tiff_series(tiff_dataset, environment) if post_result is False: return False - return True + else: logger.debug(f"TIFF series {series_name!r} is still being processed") @@ -322,14 +322,14 @@ def post_transfer( # Type checking to satisfy MyPy if not environment: logger.warning("No environment passed in") - return True + return False # Location of the file on the client PC source = _get_source(transferred_file, environment) # Type checking to satisfy MyPy if not source: logger.warning(f"No source found for file {transferred_file}") - return True + return False logger.debug( f"File {transferred_file.name!r} is a valid LIF file; starting processing" @@ -351,15 +351,15 @@ def post_transfer( post_result = self.register_lif_file(destination_file, environment) if post_result is False: return False - logger.debug(f"Registered {destination_file.name!r} in the database") + logger.info(f"Registered {destination_file.name!r} in the database") # Post URL to trigger job and convert LIF file into image stacks post_result = self.process_lif_file(destination_file, environment) if post_result is False: return False - logger.debug(f"Started preprocessing of {destination_file.name!r}") + logger.info(f"Started preprocessing of {destination_file.name!r}") - return True + # Function has completed as expected return True def register_lif_file( From 9d03fa2d736cf1daeb0697df8d553f9482271e78 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 9 Oct 2024 15:49:28 +0100 Subject: [PATCH 07/48] Corrected inconsistent returns and raises in functions --- src/murfey/client/contexts/clem.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index 5b9eaa773..df390b264 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -379,15 +379,15 @@ def register_lif_file( logger.error( "URL could not be constructed from the environment and file path" ) - return ValueError + return False # Send the message capture_post(url) return True - except Exception: + except Exception as e: logger.error( - "Error encountered when registering the LIF file in the database" + f"Error encountered when registering the LIF file in the database: {e}" ) return False @@ -409,14 +409,14 @@ def process_lif_file( logger.error( "URL could not be constructed from the environment and file path" ) - return ValueError + return False # Send the message capture_post(url) return True - except Exception: - logger.error("Error encountered processing LIF file") + except Exception as e: + logger.error(f"Error encountered processing LIF file: {e}") return False def register_tiff_file( @@ -435,16 +435,17 @@ def register_tiff_file( logger.error( "URL could not be constructed from the environment and file path" ) - return ValueError + return False # Send the message capture_post(url) return True - except Exception: + except Exception as e: logger.error( - "Error encountered when registering the TIFF file in the database" + f"Error encountered when registering the TIFF file in the database: {e}" ) + return False def process_tiff_series( self, @@ -463,11 +464,12 @@ def process_tiff_series( logger.error( "URL could not be constructed from the environment and file path" ) - return ValueError + return False # Send the message capture_post(url, json=tiff_dataset) return True - except Exception: - logger.error("Error encountered processing the TIFF series") + except Exception as e: + logger.error(f"Error encountered processing the TIFF series: {e}") + return False From dadb02d6ec8465ccfb969f4948e3e5926861243f Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 9 Oct 2024 15:58:33 +0100 Subject: [PATCH 08/48] Simplified CLEM BaseModels and updated CLEMContext file handling logic to reflect that --- src/murfey/client/contexts/clem.py | 19 ------------------- src/murfey/util/models.py | 12 ------------ 2 files changed, 31 deletions(-) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index df390b264..e2191fc9c 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -93,11 +93,7 @@ def __init__(self, acquisition_software: str, basepath: Path): self._basepath = basepath # CLEM contexts for "auto-save" acquisition mode self._tiff_series: Dict[str, List[str]] = {} # {Series name : TIFF path list} - self._tiff_timestamps: Dict[str, List[float]] = {} # {Series name : Timestamps} - self._tiff_sizes: Dict[str, List[int]] = {} # {Series name : File sizes} self._series_metadata: Dict[str, str] = {} # {Series name : Metadata file path} - self._metadata_timestamp: Dict[str, float] = {} # {Series name : Timestamp} - self._metadata_size: Dict[str, int] = {} # {Series name : File size} self._files_in_series: Dict[str, int] = {} # {Series name : Total TIFFs} def post_transfer( @@ -198,16 +194,8 @@ def post_transfer( # Create key-value pairs containing empty list if not already present if series_name not in self._tiff_series.keys(): self._tiff_series[series_name] = [] - if series_name not in self._tiff_sizes.keys(): - self._tiff_sizes[series_name] = [] - if series_name not in self._tiff_timestamps.keys(): - self._tiff_timestamps[series_name] = [] # Append information to list self._tiff_series[series_name].append(str(destination_file)) - self._tiff_sizes[series_name].append(transferred_file.stat().st_size) - self._tiff_timestamps[series_name].append( - transferred_file.stat().st_ctime - ) logger.debug( f"Created TIFF file dictionary entries for {series_name!r}" ) @@ -275,8 +263,6 @@ def post_transfer( # Update dictionary entries self._files_in_series[series_name] = num_files self._series_metadata[series_name] = str(destination_file) - self._metadata_size[series_name] = transferred_file.stat().st_size - self._metadata_timestamp[series_name] = transferred_file.stat().st_ctime logger.debug(f"Created dictionary entries for {series_name!r} metadata") # A new copy of the metadata file is created in 'processed', so no need @@ -303,12 +289,7 @@ def post_transfer( tiff_dataset = { "series_name": series_name, "tiff_files": self._tiff_series[series_name], - "tiff_sizes": self._tiff_sizes[series_name], - "tiff_timestamps": self._tiff_timestamps[series_name], "series_metadata": self._series_metadata[series_name], - "metadata_size": self._metadata_size[series_name], - "metadata_timestamp": self._metadata_timestamp[series_name], - "description": "", } post_result = self.process_tiff_series(tiff_dataset, environment) if post_result is False: diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 192cb9e2e..53a0ebebc 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -154,22 +154,10 @@ class FractionationParameters(BaseModel): """ -class LifFileInfo(BaseModel): - name: Path - size: int - timestamp: float - description: str = "" - - class TiffSeriesInfo(BaseModel): series_name: str tiff_files: List[Path] - tiff_sizes: List[int] - tiff_timestamps: List[float] series_metadata: Path - metadata_size: int - metadata_timestamp: float - description: str = "" """ From c69a22f504cdffdfbb1ab34973f05c8377789804 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 9 Oct 2024 17:03:23 +0100 Subject: [PATCH 09/48] Allow forward slashes in series name when validating input --- src/murfey/server/api/clem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 6f9095dac..bcbdde134 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -163,7 +163,7 @@ def get_db_entry( # Validate series name to use if series_name is not None: - if bool(re.fullmatch(r"^[\w\s\.\-]+$", series_name)) is False: + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", series_name)) is False: raise ValueError("One or more characters in the string are not permitted") # Return database entry if it exists From 276ee54bbec9360c534e95811657953849958f06 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 10 Oct 2024 18:53:49 +0100 Subject: [PATCH 10/48] Added folder to store module feedback callback workflows in --- src/murfey/server/feedback/clem.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 src/murfey/server/feedback/clem.py diff --git a/src/murfey/server/feedback/clem.py b/src/murfey/server/feedback/clem.py new file mode 100644 index 000000000..449223259 --- /dev/null +++ b/src/murfey/server/feedback/clem.py @@ -0,0 +1,6 @@ +""" +Functions to process the requests received by Murfey related to the CLEM workflow. + +The CLEM-related file registration API endpoints can eventually be moved here, since +the file registration processes all take place on the server side only. +""" From f6fd56db329d19470e763cc4dcb8b682e88727c4 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 10 Oct 2024 18:56:29 +0100 Subject: [PATCH 11/48] Added feedback callback elif-blocks for CLEM workflows --- src/murfey/server/__init__.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index ae49f81a5..b13327075 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -2944,6 +2944,14 @@ def feedback_callback(header: dict, message: dict) -> None: if _transport_object: _transport_object.transport.ack(header) return None + elif message["register"] == "register_lif_preprocessing_result": + # Write a function to register received CLEM LIF processing results + if _transport_object: + _transport_object.transport.ack(header) + elif message["register"] == "register_tiff_preprocessing_result": + # Write a function to register received CLEM TIFF processing results + if _transport_object: + _transport_object.transport.ack(header) if _transport_object: _transport_object.transport.nack(header, requeue=False) return None From 5ef721834187038df0b0924a980ad0b219680ea8 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 10 Oct 2024 18:57:48 +0100 Subject: [PATCH 12/48] Updated 'lif_to_stack' API endpoint to load and pass on more information --- src/murfey/server/api/clem.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index bcbdde134..8d8edbf14 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -635,6 +635,7 @@ def register_image_stack( def lif_to_stack( session_id: int, # Used by the decorator lif_file: Path, + db: Session = murfey_db, ): # Get command line entry point murfey_workflows = entry_points().select( @@ -643,14 +644,26 @@ def lif_to_stack( # Use entry point if found if len(murfey_workflows) == 1: + + # Get instrument name from the database + # This is needed in order to load the correct config file + row: MurfeySession = db.exec( + select(MurfeySession).where(MurfeySession.id == session_id) + ).one() + instrument_name = row.instrument_name + + # Pass arguments along to the correct workflow workflow: EntryPoint = list(murfey_workflows)[0] workflow.load()( # Match the arguments found in murfey.workflows.lif_to_stack file=lif_file, root_folder="images", + session_id=session_id, + instrument_name=instrument_name, messenger=_transport_object, ) return True + # Raise error if Murfey workflow not found else: raise RuntimeError("The relevant Murfey workflow was not found") From d29b8ed1eccbe2da0404158c92753e358a10e2d2 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 10 Oct 2024 18:58:45 +0100 Subject: [PATCH 13/48] Updated 'lif_to_stack' workflow to handle additional zocalo wrapper parameters --- src/murfey/workflows/lif_to_stack.py | 33 ++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/murfey/workflows/lif_to_stack.py b/src/murfey/workflows/lif_to_stack.py index 5cae568f6..753e24098 100644 --- a/src/murfey/workflows/lif_to_stack.py +++ b/src/murfey/workflows/lif_to_stack.py @@ -6,6 +6,8 @@ from pathlib import Path from typing import Optional +from murfey.util.config import get_machine_config + try: from murfey.server.ispyb import TransportManager # Session except AttributeError: @@ -15,12 +17,15 @@ def zocalo_cluster_request( file: Path, root_folder: str, + session_id: int, # Provided by the client via the API endpoint + instrument_name: str, # Acquired by looking up the Session table messenger: Optional[TransportManager] = None, ): if messenger: - # Construct path to session directory path_parts = list(file.parts) - new_path = [] + + # Construct path to session directory + session_dir_parts = [] for p in range(len(path_parts)): part = path_parts[p] # Remove leading slash for subsequent rejoining @@ -29,9 +34,26 @@ def zocalo_cluster_request( # Append up to, but not including, root folder if part.lower() == root_folder.lower(): break - new_path.append(part) - session_dir = Path("/".join(new_path)) + session_dir_parts.append(part) + session_dir = Path("/".join(session_dir_parts)) + + # Construct the job name + job_name_parts = [] + trigger = False + for p in range(len(path_parts)): + part = path_parts[p].replace(" ", "_") # Remove spaces + if trigger is True: + job_name_parts.append(part) + # Start appending at the level below the root folder + if part.lower() == root_folder.lower(): + trigger = True + job_name = "--".join(job_name_parts) + + # Load machine config to get the feedback queue + machine_config = get_machine_config() + feedback_queue = machine_config[instrument_name].feedback_queue + # Send the message messenger.send( "processing_recipe", { @@ -40,6 +62,9 @@ def zocalo_cluster_request( "session_dir": str(session_dir), "lif_path": str(file), "root_dir": root_folder, + "job_name": job_name, + "feedback_queue": feedback_queue, + "session_id": session_id, }, }, new_connection=True, From 398b09ebad0ee02a8edacda85718f2b7cee6ca33 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 30 Oct 2024 17:27:30 +0000 Subject: [PATCH 14/48] Notes on how 'feedback_callback' works --- src/murfey/server/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index b13327075..0699f1863 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -2946,10 +2946,15 @@ def feedback_callback(header: dict, message: dict) -> None: return None elif message["register"] == "register_lif_preprocessing_result": # Write a function to register received CLEM LIF processing results + # _register_lif_preprocessing_results(message) if _transport_object: _transport_object.transport.ack(header) + # When a message is received, it goes into unacked + # When it's acked, it gets removed from the queue + # When it's nacked, it eventually ends up in the DLQ elif message["register"] == "register_tiff_preprocessing_result": # Write a function to register received CLEM TIFF processing results + # _register_tiff_preprocessing_results(message0) if _transport_object: _transport_object.transport.ack(header) if _transport_object: From 7820501675b188e0fadfeff878b95b32749cb288 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 30 Oct 2024 17:32:12 +0000 Subject: [PATCH 15/48] Constructed parameters from file path; updated parameter names --- src/murfey/workflows/lif_to_stack.py | 53 +++++++++++++--------------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/src/murfey/workflows/lif_to_stack.py b/src/murfey/workflows/lif_to_stack.py index 753e24098..a4a451312 100644 --- a/src/murfey/workflows/lif_to_stack.py +++ b/src/murfey/workflows/lif_to_stack.py @@ -22,49 +22,46 @@ def zocalo_cluster_request( messenger: Optional[TransportManager] = None, ): if messenger: - path_parts = list(file.parts) + # Use file path parts to construct parameters + path_parts = list((file.parent / file.stem).parts) + # Replace leading "/" in Unix paths + path_parts[0] = "" if path_parts[0] == "/" else path_parts[0] + try: + # Find the position of the root folder in the list + root_index = [p.lower() for p in path_parts].index(root_folder.lower()) + except ValueError: + raise Exception( + f"Unable to find the root folder {root_folder!r} in the file path {file!r}" + ) - # Construct path to session directory - session_dir_parts = [] - for p in range(len(path_parts)): - part = path_parts[p] - # Remove leading slash for subsequent rejoining - if part == "/": - part = "" - # Append up to, but not including, root folder - if part.lower() == root_folder.lower(): - break - session_dir_parts.append(part) - session_dir = Path("/".join(session_dir_parts)) - - # Construct the job name - job_name_parts = [] - trigger = False - for p in range(len(path_parts)): - part = path_parts[p].replace(" ", "_") # Remove spaces - if trigger is True: - job_name_parts.append(part) - # Start appending at the level below the root folder - if part.lower() == root_folder.lower(): - trigger = True - job_name = "--".join(job_name_parts) + # Construct the session + session_dir = "/".join(path_parts[:root_index]) + job_name = "--".join( + [p.replace(" ", "_") if " " in p else p for p in path_parts][ + root_index + 1 : + ] + ) # Load machine config to get the feedback queue machine_config = get_machine_config() feedback_queue = machine_config[instrument_name].feedback_queue # Send the message + # The keys under "parameters" will populate all the matching fields in {} + # in the processing recipe messenger.send( "processing_recipe", { "recipes": ["clem-lif-to-stack"], "parameters": { + # Job parameters + "lif_file": str(file), + "root_folder": root_folder, + # Other recipe parameters "session_dir": str(session_dir), - "lif_path": str(file), - "root_dir": root_folder, + "session_id": session_id, "job_name": job_name, "feedback_queue": feedback_queue, - "session_id": session_id, }, }, new_connection=True, From 51c528c1d99f4019a8e95e5d644a0d123687e824 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 30 Oct 2024 17:33:42 +0000 Subject: [PATCH 16/48] Fixed comments --- src/murfey/workflows/lif_to_stack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/workflows/lif_to_stack.py b/src/murfey/workflows/lif_to_stack.py index a4a451312..04634c54e 100644 --- a/src/murfey/workflows/lif_to_stack.py +++ b/src/murfey/workflows/lif_to_stack.py @@ -34,7 +34,7 @@ def zocalo_cluster_request( f"Unable to find the root folder {root_folder!r} in the file path {file!r}" ) - # Construct the session + # Construct the session and job name session_dir = "/".join(path_parts[:root_index]) job_name = "--".join( [p.replace(" ", "_") if " " in p else p for p in path_parts][ From 2d6d367153f05764a31e5e6cae48da826d7939d5 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 31 Oct 2024 16:08:38 +0000 Subject: [PATCH 17/48] Removed 'instrument_name' parameter from murfey.server.murfey_db --- src/murfey/server/murfey_db.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/murfey/server/murfey_db.py b/src/murfey/server/murfey_db.py index e47c55902..2d0d52cf7 100644 --- a/src/murfey/server/murfey_db.py +++ b/src/murfey/server/murfey_db.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os from functools import partial import yaml @@ -11,8 +10,6 @@ from murfey.util.config import Security, get_security_config -instrument_name = os.getenv("BEAMLINE", "") - def url(security_config: Security | None = None) -> str: security_config = security_config or get_security_config() From e8447e0c6c40bb989105eeb860630f5d014fbbe4 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 1 Nov 2024 16:04:08 +0000 Subject: [PATCH 18/48] Refactored CLEM Murfey workflows --- pyproject.toml | 8 ++++--- src/murfey/workflows/clem/__init__.py | 0 .../workflows/{ => clem}/lif_to_stack.py | 0 src/murfey/workflows/clem/register_results.py | 22 +++++++++++++++++++ .../workflows/{ => clem}/tiff_to_stack.py | 0 5 files changed, 27 insertions(+), 3 deletions(-) create mode 100644 src/murfey/workflows/clem/__init__.py rename src/murfey/workflows/{ => clem}/lif_to_stack.py (100%) create mode 100644 src/murfey/workflows/clem/register_results.py rename src/murfey/workflows/{ => clem}/tiff_to_stack.py (100%) diff --git a/pyproject.toml b/pyproject.toml index 179348107..116b3ac53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,9 +94,11 @@ murfey = "murfey.client:run" "murfey.transfer" = "murfey.cli.transfer:run" [project.entry-points."murfey.auth.token_validation"] "password" = "murfey.server.api.auth:password_token_validation" -[project.entry-points."murfey.workflows"] -"lif_to_stack" = "murfey.workflows.lif_to_stack:zocalo_cluster_request" -"tiff_to_stack" = "murfey.workflows.tiff_to_stack:zocalo_cluster_request" +[project.entry-points."murfey.workflows.clem"] +"lif_to_stack" = "murfey.workflows.clem.lif_to_stack:zocalo_cluster_request" +"register_lif_preprocessing_result" = "murfey.workflows.clem.register_results:register_lif_preprocessing_result" +"register_tiff_preprocessing_result" = "murfey.workflows.clem.register_results:register_tiff_preprocessing_result" +"tiff_to_stack" = "murfey.workflows.clem.tiff_to_stack:zocalo_cluster_request" [tool.setuptools] package-dir = {"" = "src"} diff --git a/src/murfey/workflows/clem/__init__.py b/src/murfey/workflows/clem/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/murfey/workflows/lif_to_stack.py b/src/murfey/workflows/clem/lif_to_stack.py similarity index 100% rename from src/murfey/workflows/lif_to_stack.py rename to src/murfey/workflows/clem/lif_to_stack.py diff --git a/src/murfey/workflows/clem/register_results.py b/src/murfey/workflows/clem/register_results.py new file mode 100644 index 000000000..4d8f0fab6 --- /dev/null +++ b/src/murfey/workflows/clem/register_results.py @@ -0,0 +1,22 @@ +""" +Functions to process the requests received by Murfey related to the CLEM workflow. + +The CLEM-related file registration API endpoints can eventually be moved here, since +the file registration processes all take place on the server side only. +""" + + +def register_lif_preprocessing_result(message: dict): + """ + session_id (recipe) + register (wrapper) + output_files (wrapper) + key1 + key2 + ... + """ + pass + + +def register_tiff_preprocessing_result(message: dict): + pass diff --git a/src/murfey/workflows/tiff_to_stack.py b/src/murfey/workflows/clem/tiff_to_stack.py similarity index 100% rename from src/murfey/workflows/tiff_to_stack.py rename to src/murfey/workflows/clem/tiff_to_stack.py From c4bdfc7de23cccd527447faf7666b8a264b490bd Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 1 Nov 2024 16:31:47 +0000 Subject: [PATCH 19/48] Missed deleting the old file --- src/murfey/server/feedback/clem.py | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 src/murfey/server/feedback/clem.py diff --git a/src/murfey/server/feedback/clem.py b/src/murfey/server/feedback/clem.py deleted file mode 100644 index 449223259..000000000 --- a/src/murfey/server/feedback/clem.py +++ /dev/null @@ -1,6 +0,0 @@ -""" -Functions to process the requests received by Murfey related to the CLEM workflow. - -The CLEM-related file registration API endpoints can eventually be moved here, since -the file registration processes all take place on the server side only. -""" From a01a7865686fa367f4c1ac0b651082100d3532e6 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 1 Nov 2024 16:32:14 +0000 Subject: [PATCH 20/48] Updated TIFF Pydantic model --- src/murfey/server/api/clem.py | 8 ++++---- src/murfey/util/models.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 8d8edbf14..81ee6afdb 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -22,7 +22,7 @@ CLEMTIFFFile, ) from murfey.util.db import Session as MurfeySession -from murfey.util.models import TiffSeriesInfo +from murfey.util.models import TIFFSeriesInfo # Use backport from importlib_metadata for Python <3.10 if sys.version_info.major == 3 and sys.version_info.minor < 10: @@ -639,7 +639,7 @@ def lif_to_stack( ): # Get command line entry point murfey_workflows = entry_points().select( - group="murfey.workflows", name="lif_to_stack" + group="murfey.workflows.clem", name="lif_to_stack" ) # Use entry point if found @@ -672,11 +672,11 @@ def lif_to_stack( @router.post("/sessions/{session_id}/tiff_to_stack") def tiff_to_stack( session_id: int, # Used by the decorator - tiff_info: TiffSeriesInfo, + tiff_info: TIFFSeriesInfo, ): # Get command line entry point murfey_workflows = entry_points().select( - group="murfey.workflows", name="tiff_to_stack" + group="murfey.workflows.clem", name="tiff_to_stack" ) # Use entry point if found diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 53a0ebebc..c61044c13 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -154,7 +154,7 @@ class FractionationParameters(BaseModel): """ -class TiffSeriesInfo(BaseModel): +class TIFFSeriesInfo(BaseModel): series_name: str tiff_files: List[Path] series_metadata: Path From 185158f8c96285f893afc325a85c5ce61c39b9b3 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 1 Nov 2024 18:03:29 +0000 Subject: [PATCH 21/48] Added Pydantic models to validate LIF and TIFF preprocessing result messages --- src/murfey/util/models.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index c61044c13..feb655e39 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -160,6 +160,22 @@ class TIFFSeriesInfo(BaseModel): series_metadata: Path +class LIFPreprocessingResult(BaseModel): + image_stack: Path + metadata: Path + series_name: str + color: str + parent_lif: Path + + +class TIFFPreprocessingResult(BaseModel): + image_stack: Path + metadata: Path + series_name: str + color: str + parent_tiffs: List[Path] + + """ FIB === From ad9a4895f14eb311bf436abdea085d06f5e21add Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 1 Nov 2024 18:46:17 +0000 Subject: [PATCH 22/48] Changed 'color' to the more generic 'channel' --- src/murfey/util/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index feb655e39..976b20de0 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -164,7 +164,7 @@ class LIFPreprocessingResult(BaseModel): image_stack: Path metadata: Path series_name: str - color: str + channel: str parent_lif: Path @@ -172,7 +172,7 @@ class TIFFPreprocessingResult(BaseModel): image_stack: Path metadata: Path series_name: str - color: str + channel: str parent_tiffs: List[Path] From c9ffa31ab2405feea45591f017fa646b9cd90287 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 1 Nov 2024 18:52:51 +0000 Subject: [PATCH 23/48] Refactored functions to parse CLEM preprocessing results and started writing function to register results --- src/murfey/server/__init__.py | 10 +- src/murfey/workflows/clem/register_results.py | 296 +++++++++++++++++- 2 files changed, 297 insertions(+), 9 deletions(-) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 0699f1863..c9b322794 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -51,6 +51,10 @@ get_microscope, get_security_config, ) +from murfey.workflows.clem.register_results import ( + register_lif_preprocessing_result, + register_tiff_preprocessing_result, +) try: from murfey.server.ispyb import TransportManager # Session @@ -2945,16 +2949,14 @@ def feedback_callback(header: dict, message: dict) -> None: _transport_object.transport.ack(header) return None elif message["register"] == "register_lif_preprocessing_result": - # Write a function to register received CLEM LIF processing results - # _register_lif_preprocessing_results(message) + register_lif_preprocessing_result(message, murfey_db) if _transport_object: _transport_object.transport.ack(header) # When a message is received, it goes into unacked # When it's acked, it gets removed from the queue # When it's nacked, it eventually ends up in the DLQ elif message["register"] == "register_tiff_preprocessing_result": - # Write a function to register received CLEM TIFF processing results - # _register_tiff_preprocessing_results(message0) + register_tiff_preprocessing_result(message, murfey_db) if _transport_object: _transport_object.transport.ack(header) if _transport_object: diff --git a/src/murfey/workflows/clem/register_results.py b/src/murfey/workflows/clem/register_results.py index 4d8f0fab6..3bead3747 100644 --- a/src/murfey/workflows/clem/register_results.py +++ b/src/murfey/workflows/clem/register_results.py @@ -5,18 +5,304 @@ the file registration processes all take place on the server side only. """ +from __future__ import annotations -def register_lif_preprocessing_result(message: dict): +import json +import logging +import re +from pathlib import Path +from typing import Optional, Type, Union + +from sqlalchemy.exc import NoResultFound +from sqlmodel import Session, select + +from murfey.util.config import get_machine_config +from murfey.util.db import ( + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, +) +from murfey.util.db import Session as MurfeySession +from murfey.util.models import LIFPreprocessingResult, TIFFPreprocessingResult + +logger = logging.getLogger("murfey.workflows.clem.register_results") + + +def _validate_and_sanitise( + file: Path, + session_id: int, + db: Session, +) -> Path: + """ + Performs validation and sanitisation on the incoming file paths, ensuring that + no forbidden characters are present and that the the path points only to allowed + sections of the file server. + + Returns the file path as a sanitised string that can be converted into a Path + object again. + + NOTE: Due to the instrument name query, 'db' now needs to be passed as an + explicit variable to this function from within a FastAPI endpoint, as using the + instance that was imported directly won't load it in the correct state. + """ + + valid_file_types = ( + ".lif", + ".tif", + ".tiff", + ".xlif", + ".xml", + ) + + # Resolve symlinks and directory changes to get full file path + full_path = Path(file).resolve() + + # Use machine configuration to validate which file base paths are accepted from + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + rsync_basepath = machine_config.rsync_basepath + try: + base_path = list(rsync_basepath.parents)[-2].as_posix() + except IndexError: + # Print to troubleshoot + logger.warning(f"Base path {rsync_basepath!r} is too short") + base_path = rsync_basepath.as_posix() + except Exception: + raise Exception("Unexpected exception occurred when loading the file base path") + + # Check that full file path doesn't contain unallowed characters + # Currently allows only: + # - words (alphanumerics and "_"; \w), + # - spaces (\s), + # - periods, + # - dashes, + # - forward slashes ("/") + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False: + raise ValueError(f"Unallowed characters present in {file}") + + # Check that it's not accessing somehwere it's not allowed + if not str(full_path).startswith(str(base_path)): + raise ValueError(f"{file} points to a directory that is not permitted") + + # Check that it's a file, not a directory + if full_path.is_file() is False: + raise ValueError(f"{file} is not a file") + + # Check that it is of a permitted file type + if f"{full_path.suffix}" not in valid_file_types: + raise ValueError(f"{full_path.suffix} is not a permitted file format") + + return full_path + + +def get_db_entry( + db: Session, + # With the database search funcion having been moved out of the FastAPI + # endpoint, the database now has to be explicitly passed within the FastAPI + # endpoint function in order for it to be loaded in the correct state. + table: Type[ + Union[ + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, + ] + ], + session_id: int, + file_path: Optional[Path] = None, + series_name: Optional[str] = None, +) -> Union[ + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, +]: + """ + Searches the CLEM workflow-related tables in the Murfey database for an entry that + matches the file path or series name within a given session. Returns the entry if + a match is found, otherwise register it as a new entry in the database. + """ + + # Validate that parameters are provided correctly + if file_path is None and series_name is None: + raise ValueError( + "One of either 'file_path' or 'series_name' has to be provided" + ) + if file_path is not None and series_name is not None: + raise ValueError("Only one of 'file_path' or 'series_name' should be provided") + + # Validate file path if provided + if file_path is not None: + try: + file_path = _validate_and_sanitise(file_path, session_id, db) + except Exception: + raise Exception + + # Validate series name to use + if series_name is not None: + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", series_name)) is False: + raise ValueError("One or more characters in the string are not permitted") + + # Return database entry if it exists + try: + db_entry = ( + db.exec( + select(table) + .where(table.session_id == session_id) + .where(table.file_path == str(file_path)) + ).one() + if file_path is not None + else db.exec( + select(table) + .where(table.session_id == session_id) + .where(table.series_name == series_name) + ).one() + ) + # Create and register new entry if not present + except NoResultFound: + db_entry = ( + table( + file_path=str(file_path), + session_id=session_id, + ) + if file_path is not None + else table( + series_name=series_name, + session_id=session_id, + ) + ) + db.add(db_entry) + db.commit() + db.refresh(db_entry) + except Exception: + raise Exception + + return db_entry + + +def register_lif_preprocessing_result( + message: dict, db: Session, demo: bool = False +) -> bool: """ session_id (recipe) register (wrapper) - output_files (wrapper) + result (wrapper) key1 key2 ... """ - pass + session_id: int = ( + int(message["session_id"]) + if not isinstance(message["session_id"], int) + else message["session_id"] + ) + + # Validate message and try and load results + if isinstance(message["result"], str): + try: + json_obj: dict = json.loads(message["result"]) + result = LIFPreprocessingResult(**json_obj) + except Exception as e: + logger.error( + f"Exception encountered when parsing LIF preprocessing result: {e}" + ) + return False + elif isinstance(message["result"], dict): + try: + result = LIFPreprocessingResult(**message["result"]) + except Exception as e: + logger.error( + f"Exception encountered when parsing LIF preprocessing result: {e}" + ) + return False + else: + logger.error( + f"Invalid type for LIF preprocessing result: {type(message['result'])}" + ) + return False + + # Register items in database if not already present + try: + clem_img_stk: CLEMImageStack = get_db_entry( + db=db, + table=CLEMImageStack, + session_id=session_id, + file_path=result.image_stack, + ) + + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + + clem_metadata: CLEMImageMetadata = get_db_entry( + db=db, + table=CLEMImageMetadata, + session_id=session_id, + file_path=result.metadata, + ) + + clem_lif_file: CLEMLIFFile = get_db_entry( + db=db, + table=CLEMLIFFile, + session_id=session_id, + file_path=result.parent_lif, + ) + + # Link entries to one another + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_lif = clem_lif_file + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = result.channel + clem_img_stk.stack_created = True + db.add(clem_img_stk) + db.commit() + db.refresh() + + return True + + except Exception as e: + logger.error( + f"Exception encountered when registering LIF preprocessing result: {e}" + ) + return False + + finally: + db.close() + + +def register_tiff_preprocessing_result( + message: dict, db: Session, demo: bool = False +) -> bool: + + session_id: int = ( + int(message["session_id"]) + if not isinstance(message["session_id"], int) + else message["session_id"] + ) + if isinstance(message["result"], str): + json_obj: dict = json.loads(message["result"]) + result = TIFFPreprocessingResult(**json_obj) + elif isinstance(message["result"], dict): + result = TIFFPreprocessingResult(**message["result"]) + else: + logger.error("Invalid type for TIFF preprocessing result") + return False + + if result and session_id: + pass -def register_tiff_preprocessing_result(message: dict): - pass + return True From c3c78a06c4c999bcb184d4f5590686994c6aa87a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 4 Nov 2024 09:33:34 +0000 Subject: [PATCH 24/48] Completed draft of function to register LIF preprocessing results --- src/murfey/workflows/clem/register_results.py | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/murfey/workflows/clem/register_results.py b/src/murfey/workflows/clem/register_results.py index 3bead3747..6d97f7931 100644 --- a/src/murfey/workflows/clem/register_results.py +++ b/src/murfey/workflows/clem/register_results.py @@ -72,11 +72,12 @@ def _validate_and_sanitise( try: base_path = list(rsync_basepath.parents)[-2].as_posix() except IndexError: - # Print to troubleshoot logger.warning(f"Base path {rsync_basepath!r} is too short") base_path = rsync_basepath.as_posix() - except Exception: - raise Exception("Unexpected exception occurred when loading the file base path") + except Exception as e: + raise Exception( + f"Unexpected exception encountered when loading the file base path: {e}" + ) # Check that full file path doesn't contain unallowed characters # Currently allows only: @@ -262,7 +263,7 @@ def register_lif_preprocessing_result( file_path=result.parent_lif, ) - # Link entries to one another + # Link tables to one another and populate fields clem_img_stk.associated_metadata = clem_metadata clem_img_stk.parent_lif = clem_lif_file clem_img_stk.parent_series = clem_img_series @@ -272,6 +273,19 @@ def register_lif_preprocessing_result( db.commit() db.refresh() + clem_img_series.associated_metadata = clem_metadata + clem_img_series.parent_lif = clem_lif_file + db.add(clem_img_series) + db.commit() + db.refresh() + + clem_metadata.parent_lif = clem_lif_file + db.add(clem_metadata) + db.commit() + db.refresh() + + logger.info(f"LIF preprocessing results registered for {result.series_name}") + return True except Exception as e: From f9858ee2f965ae53a4871eb560b579dc03822d0a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 4 Nov 2024 09:40:50 +0000 Subject: [PATCH 25/48] Added 'number_of_members' as a parameter to register for a series --- src/murfey/util/db.py | 9 ++++++--- src/murfey/util/models.py | 2 ++ src/murfey/workflows/clem/register_results.py | 1 + 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index ec39d900f..01c5bad60 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -269,9 +269,12 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore ) # One to many # Process checklist for series - images_aligned: bool = False # Image stacks aligned to reference image - rgbs_created: bool = False # Image stacks all colorised - composite_created: bool = False # Composite flattened image created + number_of_members: int = ( + 0 # Expected number of image stacks belonging to this series + ) + images_aligned: bool = False # Have all members been aligned? + rgbs_created: bool = False # Have all members been colourised? + composite_created: bool = False # Has a composite image been created? composite_image: Optional[str] = None # Full path to composite image diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 976b20de0..77bcee218 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -165,6 +165,7 @@ class LIFPreprocessingResult(BaseModel): metadata: Path series_name: str channel: str + number_of_members: int parent_lif: Path @@ -173,6 +174,7 @@ class TIFFPreprocessingResult(BaseModel): metadata: Path series_name: str channel: str + number_of_members: int parent_tiffs: List[Path] diff --git a/src/murfey/workflows/clem/register_results.py b/src/murfey/workflows/clem/register_results.py index 6d97f7931..33c1eeebd 100644 --- a/src/murfey/workflows/clem/register_results.py +++ b/src/murfey/workflows/clem/register_results.py @@ -275,6 +275,7 @@ def register_lif_preprocessing_result( clem_img_series.associated_metadata = clem_metadata clem_img_series.parent_lif = clem_lif_file + clem_img_series.number_of_members = result.number_of_members db.add(clem_img_series) db.commit() db.refresh() From ecf6d3817c9e26c55ec7cd86256b1f7f8367b64e Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 4 Nov 2024 09:51:16 +0000 Subject: [PATCH 26/48] Added 'return None' to CLEM results registration code blocks --- src/murfey/server/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index c9b322794..e752e8ac9 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -2952,13 +2952,12 @@ def feedback_callback(header: dict, message: dict) -> None: register_lif_preprocessing_result(message, murfey_db) if _transport_object: _transport_object.transport.ack(header) - # When a message is received, it goes into unacked - # When it's acked, it gets removed from the queue - # When it's nacked, it eventually ends up in the DLQ + return None elif message["register"] == "register_tiff_preprocessing_result": register_tiff_preprocessing_result(message, murfey_db) if _transport_object: _transport_object.transport.ack(header) + return None if _transport_object: _transport_object.transport.nack(header, requeue=False) return None From f7681c92f93c74de9554cec00079e10f3cf2adb4 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 4 Nov 2024 10:45:23 +0000 Subject: [PATCH 27/48] Rewrote 'tiff_to_stack' function to work with Zocalo wrapper --- src/murfey/server/api/clem.py | 15 ++++++- src/murfey/workflows/clem/tiff_to_stack.py | 50 ++++++++++++++-------- 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 81ee6afdb..7527d8c3c 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -643,7 +643,7 @@ def lif_to_stack( ) # Use entry point if found - if len(murfey_workflows) == 1: + if murfey_workflows: # Get instrument name from the database # This is needed in order to load the correct config file @@ -673,6 +673,7 @@ def lif_to_stack( def tiff_to_stack( session_id: int, # Used by the decorator tiff_info: TIFFSeriesInfo, + db: Session = murfey_db, ): # Get command line entry point murfey_workflows = entry_points().select( @@ -681,14 +682,26 @@ def tiff_to_stack( # Use entry point if found if murfey_workflows: + + # Load instrument name from database + row: MurfeySession = db.exec( + select(MurfeySession).where(MurfeySession.id == session_id) + ).one() + instrument_name = row.instrument_name + + # Pass arguments to correct workflow workflow: EntryPoint = list(murfey_workflows)[0] workflow.load()( # Match the arguments found in murfey.workflows.tiff_to_stack file=tiff_info.tiff_files[0], # Pass it only one file from the list root_folder="images", + session_id=session_id, + instrument_name=instrument_name, metadata=tiff_info.series_metadata, messenger=_transport_object, ) + return True + # Raise error if Murfey workflow not found else: raise RuntimeError("The relevant Murfey workflow was not found") diff --git a/src/murfey/workflows/clem/tiff_to_stack.py b/src/murfey/workflows/clem/tiff_to_stack.py index 98fa4f456..c0ef63945 100644 --- a/src/murfey/workflows/clem/tiff_to_stack.py +++ b/src/murfey/workflows/clem/tiff_to_stack.py @@ -6,6 +6,8 @@ from pathlib import Path from typing import Optional +from murfey.util.config import get_machine_config + try: from murfey.server.ispyb import TransportManager # Session except AttributeError: @@ -13,40 +15,52 @@ def zocalo_cluster_request( - file: Path, + tiff_list: list[Path], root_folder: str, + session_id: int, + instrument_name: str, metadata: Optional[Path] = None, messenger: Optional[TransportManager] = None, ): if messenger: # Construct path to session directory - path_parts = list(file.parts) - new_path = [] - for p in range(len(path_parts)): - part = path_parts[p] - # Remove leading slash for subsequent rejoining - if part == "/": - part = "" - # Append up to, but not including, root folder - if part.lower() == root_folder.lower(): - break - new_path.append(part) - session_dir = Path("/".join(new_path)) + path_parts = list(tiff_list[0].parts) + # Replace leading "/" in Unix paths + path_parts[0] = "" if path_parts[0] == "/" else path_parts[0] + try: + # Find the position of the root folder in the list + root_index = [p.lower() for p in path_parts].index(root_folder.lower()) + except ValueError: + raise Exception( + f"Unable to find the root folder {root_folder!r} in the file path {tiff_list[0]!r}" + ) + # Construct the session and job name + session_dir = "/".join(path_parts[:root_index]) # If no metadata file provided, generate path to one if metadata is None: - series_name = file.stem.split("--")[0] - metadata = file.parent / "Metadata" / (series_name + ".xlif") + series_name = tiff_list[0].stem.split("--")[0] + metadata = tiff_list[0].parent / "Metadata" / (series_name + ".xlif") + + # Load machine config to get the feedback queue + machine_config = get_machine_config() + feedback_queue = machine_config[instrument_name].feedback_queue messenger.send( "processing_recipe", { "recipes": ["clem-tiff-to-stack"], "parameters": { - "session_dir": str(session_dir), - "tiff_path": str(file), - "root_dir": root_folder, + # Job parameters + "tiff_list": tiff_list, + "root_folder": root_folder, "metadata": str(metadata), + "tiff_file": "null", + # Other recipe parameters + "session_dir": str(session_dir), + "session_id": session_id, + "job_name": series_name, + "feedback_queue": feedback_queue, }, }, new_connection=True, From 480a7e432f1eceb10f16bd3677534b2ee0ab6f53 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 4 Nov 2024 10:49:16 +0000 Subject: [PATCH 28/48] Modified URLs for CLEM preprocesisng API endpoints --- src/murfey/client/contexts/clem.py | 4 ++-- src/murfey/server/api/clem.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index 2ce45ad27..bab50b215 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -386,7 +386,7 @@ def process_lif_file( try: # Construct the URL to post the request to - url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/lif_to_stack?lif_file={quote(str(lif_file), safe='')}" + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/lif_to_stack?lif_file={quote(str(lif_file), safe='')}" # Validate if not url: logger.error( @@ -442,7 +442,7 @@ def process_tiff_series( try: # Construct URL for Murfey server to communicate with - url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/tiff_to_stack" + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/tiff_to_stack" if not url: logger.error( "URL could not be constructed from the environment and file path" diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 7527d8c3c..4270d882c 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -631,7 +631,9 @@ def register_image_stack( """ -@router.post("/sessions/{session_id}/lif_to_stack") # API posts to this URL +@router.post( + "/sessions/{session_id}/clem/preprocessing/lif_to_stack" +) # API posts to this URL def lif_to_stack( session_id: int, # Used by the decorator lif_file: Path, @@ -669,7 +671,7 @@ def lif_to_stack( raise RuntimeError("The relevant Murfey workflow was not found") -@router.post("/sessions/{session_id}/tiff_to_stack") +@router.post("/sessions/{session_id}/clem/preprocessing/tiff_to_stack") def tiff_to_stack( session_id: int, # Used by the decorator tiff_info: TIFFSeriesInfo, From 09e85ccacc37d1a9673ebf427a8edcc184e2d4e9 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 4 Nov 2024 11:20:07 +0000 Subject: [PATCH 29/48] Stringified TIFF list when submitting to cluster --- src/murfey/workflows/clem/tiff_to_stack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/workflows/clem/tiff_to_stack.py b/src/murfey/workflows/clem/tiff_to_stack.py index c0ef63945..d788a5048 100644 --- a/src/murfey/workflows/clem/tiff_to_stack.py +++ b/src/murfey/workflows/clem/tiff_to_stack.py @@ -52,7 +52,7 @@ def zocalo_cluster_request( "recipes": ["clem-tiff-to-stack"], "parameters": { # Job parameters - "tiff_list": tiff_list, + "tiff_list": str([str(file) for file in tiff_list]), "root_folder": root_folder, "metadata": str(metadata), "tiff_file": "null", From 3cd3344d35e8b822399eb13e19b693e079970726 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 4 Nov 2024 15:04:26 +0000 Subject: [PATCH 30/48] Ensured file paths in recipes are represented canonically --- src/murfey/workflows/clem/lif_to_stack.py | 4 ++-- src/murfey/workflows/clem/tiff_to_stack.py | 15 ++++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/murfey/workflows/clem/lif_to_stack.py b/src/murfey/workflows/clem/lif_to_stack.py index 04634c54e..5355b3235 100644 --- a/src/murfey/workflows/clem/lif_to_stack.py +++ b/src/murfey/workflows/clem/lif_to_stack.py @@ -55,10 +55,10 @@ def zocalo_cluster_request( "recipes": ["clem-lif-to-stack"], "parameters": { # Job parameters - "lif_file": str(file), + "lif_file": f"{str(file)!r}", "root_folder": root_folder, # Other recipe parameters - "session_dir": str(session_dir), + "session_dir": f"{str(session_dir)!r}", "session_id": session_id, "job_name": job_name, "feedback_queue": feedback_queue, diff --git a/src/murfey/workflows/clem/tiff_to_stack.py b/src/murfey/workflows/clem/tiff_to_stack.py index d788a5048..3a45c45c1 100644 --- a/src/murfey/workflows/clem/tiff_to_stack.py +++ b/src/murfey/workflows/clem/tiff_to_stack.py @@ -36,6 +36,11 @@ def zocalo_cluster_request( ) # Construct the session and job name session_dir = "/".join(path_parts[:root_index]) + job_name = "--".join( + [p.replace(" ", "_") if " " in p else p for p in path_parts][ + root_index + 1 : + ] + ) # If no metadata file provided, generate path to one if metadata is None: @@ -52,14 +57,14 @@ def zocalo_cluster_request( "recipes": ["clem-tiff-to-stack"], "parameters": { # Job parameters - "tiff_list": str([str(file) for file in tiff_list]), + "tiff_list": "null", "root_folder": root_folder, - "metadata": str(metadata), - "tiff_file": "null", + "metadata": f"{str(metadata)!r}", + "tiff_file": f"{str(tiff_list[0])!r}", # Other recipe parameters - "session_dir": str(session_dir), + "session_dir": f"{str(session_dir)!r}", "session_id": session_id, - "job_name": series_name, + "job_name": job_name, "feedback_queue": feedback_queue, }, }, From 816d434255634c81139d113bfb9ec13401dcbbd1 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 4 Nov 2024 15:44:37 +0000 Subject: [PATCH 31/48] Adjusted order of parameters --- src/murfey/workflows/clem/tiff_to_stack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/workflows/clem/tiff_to_stack.py b/src/murfey/workflows/clem/tiff_to_stack.py index 3a45c45c1..564e21c53 100644 --- a/src/murfey/workflows/clem/tiff_to_stack.py +++ b/src/murfey/workflows/clem/tiff_to_stack.py @@ -58,9 +58,9 @@ def zocalo_cluster_request( "parameters": { # Job parameters "tiff_list": "null", + "tiff_file": f"{str(tiff_list[0])!r}", "root_folder": root_folder, "metadata": f"{str(metadata)!r}", - "tiff_file": f"{str(tiff_list[0])!r}", # Other recipe parameters "session_dir": f"{str(session_dir)!r}", "session_id": session_id, From 8576eb1cebbb17ab16ba7416c18c603a1a7691c4 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 4 Nov 2024 18:26:25 +0000 Subject: [PATCH 32/48] Canonicalisation of strings appears to not be needed when submitting recipe --- src/murfey/workflows/clem/lif_to_stack.py | 4 ++-- src/murfey/workflows/clem/tiff_to_stack.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/murfey/workflows/clem/lif_to_stack.py b/src/murfey/workflows/clem/lif_to_stack.py index 5355b3235..d02e6ac1e 100644 --- a/src/murfey/workflows/clem/lif_to_stack.py +++ b/src/murfey/workflows/clem/lif_to_stack.py @@ -55,10 +55,10 @@ def zocalo_cluster_request( "recipes": ["clem-lif-to-stack"], "parameters": { # Job parameters - "lif_file": f"{str(file)!r}", + "lif_file": f"{str(file)}", "root_folder": root_folder, # Other recipe parameters - "session_dir": f"{str(session_dir)!r}", + "session_dir": f"{str(session_dir)}", "session_id": session_id, "job_name": job_name, "feedback_queue": feedback_queue, diff --git a/src/murfey/workflows/clem/tiff_to_stack.py b/src/murfey/workflows/clem/tiff_to_stack.py index 564e21c53..c72ff3c26 100644 --- a/src/murfey/workflows/clem/tiff_to_stack.py +++ b/src/murfey/workflows/clem/tiff_to_stack.py @@ -58,11 +58,11 @@ def zocalo_cluster_request( "parameters": { # Job parameters "tiff_list": "null", - "tiff_file": f"{str(tiff_list[0])!r}", + "tiff_file": f"{str(tiff_list[0])}", "root_folder": root_folder, - "metadata": f"{str(metadata)!r}", + "metadata": f"{str(metadata)}", # Other recipe parameters - "session_dir": f"{str(session_dir)!r}", + "session_dir": f"{str(session_dir)}", "session_id": session_id, "job_name": job_name, "feedback_queue": feedback_queue, From 4e5b2023d05af37a1f82e9910a30bfcce73e16e8 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 5 Nov 2024 13:42:40 +0000 Subject: [PATCH 33/48] Fixed outdated parameters to pass over to 'tiff_to_stack' workflow --- src/murfey/server/api/clem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 4270d882c..07d89b6d2 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -695,7 +695,7 @@ def tiff_to_stack( workflow: EntryPoint = list(murfey_workflows)[0] workflow.load()( # Match the arguments found in murfey.workflows.tiff_to_stack - file=tiff_info.tiff_files[0], # Pass it only one file from the list + tiff_list=tiff_info.tiff_files, root_folder="images", session_id=session_id, instrument_name=instrument_name, From fbc9fe59d0974a516c1c79c5b221cfde1119769a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 6 Nov 2024 11:59:25 +0000 Subject: [PATCH 34/48] Fixed broken database refreshes and updated logged messages --- src/murfey/workflows/clem/register_results.py | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/murfey/workflows/clem/register_results.py b/src/murfey/workflows/clem/register_results.py index 33c1eeebd..0eafafccb 100644 --- a/src/murfey/workflows/clem/register_results.py +++ b/src/murfey/workflows/clem/register_results.py @@ -10,6 +10,7 @@ import json import logging import re +import traceback from pathlib import Path from typing import Optional, Type, Union @@ -214,18 +215,16 @@ def register_lif_preprocessing_result( try: json_obj: dict = json.loads(message["result"]) result = LIFPreprocessingResult(**json_obj) - except Exception as e: - logger.error( - f"Exception encountered when parsing LIF preprocessing result: {e}" - ) + except Exception: + logger.error(traceback.format_exc()) + logger.error("Exception encountered when parsing LIF preprocessing result") return False elif isinstance(message["result"], dict): try: result = LIFPreprocessingResult(**message["result"]) - except Exception as e: - logger.error( - f"Exception encountered when parsing LIF preprocessing result: {e}" - ) + except Exception: + logger.error(traceback.format_exc()) + logger.error("Exception encountered when parsing LIF preprocessing result") return False else: logger.error( @@ -271,27 +270,29 @@ def register_lif_preprocessing_result( clem_img_stk.stack_created = True db.add(clem_img_stk) db.commit() - db.refresh() + db.refresh(clem_img_stk) clem_img_series.associated_metadata = clem_metadata clem_img_series.parent_lif = clem_lif_file clem_img_series.number_of_members = result.number_of_members db.add(clem_img_series) db.commit() - db.refresh() + db.refresh(clem_img_series) clem_metadata.parent_lif = clem_lif_file db.add(clem_metadata) db.commit() - db.refresh() - - logger.info(f"LIF preprocessing results registered for {result.series_name}") + db.refresh(clem_metadata) + logger.info( + f"LIF preprocessing results registered for {result.series_name!r} {result.channel!r} image stack" + ) return True - except Exception as e: + except Exception: + logger.error(traceback.format_exc()) logger.error( - f"Exception encountered when registering LIF preprocessing result: {e}" + f"Exception encountered when registering LIF preprocessing result for {result.series_name!r} {result.channel!r} image stack" ) return False From 8bcd8c0272560c1f4ef56abcd293868bb0861749 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 6 Nov 2024 15:29:40 +0000 Subject: [PATCH 35/48] Updated 'TIFFPreprocessingResult' Pydantic model to parse stringified list --- src/murfey/util/models.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 88f2ac414..7af87f754 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -1,10 +1,11 @@ from __future__ import annotations +from ast import literal_eval from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, validator """ General Models @@ -175,7 +176,23 @@ class TIFFPreprocessingResult(BaseModel): series_name: str channel: str number_of_members: int - parent_tiffs: List[Path] + parent_tiffs: list[Path] + + @validator( + "parent_tiffs", + pre=True, + ) + def parse_stringified_list(cls, value): + if isinstance(value, str): + try: + eval_result = literal_eval(value) + if isinstance(eval_result, list): + parent_tiffs = [Path(p) for p in eval_result] + return parent_tiffs + except (SyntaxError, ValueError): + raise ValueError("Unable to parse input") + # Return value as-is; if it fails, it fails + return value """ From 2cc56247bb8d7908623629883b26cfb0a53641b9 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 6 Nov 2024 15:30:14 +0000 Subject: [PATCH 36/48] Completed function to register TIFF preprocessing results --- src/murfey/workflows/clem/register_results.py | 86 +++++++++++++++++-- 1 file changed, 79 insertions(+), 7 deletions(-) diff --git a/src/murfey/workflows/clem/register_results.py b/src/murfey/workflows/clem/register_results.py index 0eafafccb..0041176b9 100644 --- a/src/murfey/workflows/clem/register_results.py +++ b/src/murfey/workflows/clem/register_results.py @@ -310,15 +310,87 @@ def register_tiff_preprocessing_result( else message["session_id"] ) if isinstance(message["result"], str): - json_obj: dict = json.loads(message["result"]) - result = TIFFPreprocessingResult(**json_obj) + try: + json_obj: dict = json.loads(message["result"]) + result = TIFFPreprocessingResult(**json_obj) + except Exception: + logger.error(traceback.format_exc()) + logger.error("Exception encountered when parsing TIFF preprocessing result") + return False elif isinstance(message["result"], dict): - result = TIFFPreprocessingResult(**message["result"]) + try: + result = TIFFPreprocessingResult(**message["result"]) + except Exception: + logger.error(traceback.format_exc()) + logger.error("Exception encountered when parsing TIFF preprocessing result") else: - logger.error("Invalid type for TIFF preprocessing result") + logger.error( + f"Invalid type for TIFF preprocessing result: {type(message['result'])}" + ) return False - if result and session_id: - pass + # Register items in database if not already present + try: + clem_img_stk: CLEMImageStack = get_db_entry( + db=db, + table=CLEMImageStack, + session_id=session_id, + file_path=result.image_stack, + ) + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_metadata: CLEMImageMetadata = get_db_entry( + db=db, + table=CLEMImageMetadata, + session_id=session_id, + file_path=result.metadata, + ) + + # Link tables to one another and populate fields + # Register TIFF files and populate them iteratively first + for file in result.parent_tiffs: + clem_tiff_file: CLEMTIFFFile = get_db_entry( + db=db, + table=CLEMTIFFFile, + session_id=session_id, + file_path=file, + ) + clem_tiff_file.associated_metadata = clem_metadata + clem_tiff_file.child_series = clem_img_series + clem_tiff_file.child_stack = clem_img_stk + db.add(clem_tiff_file) + db.commit() + db.refresh(clem_tiff_file) + + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = result.channel + clem_img_stk.stack_created = True + db.add(clem_img_stk) + db.commit() + db.refresh(clem_img_stk) + + clem_img_series.associated_metadata = clem_metadata + clem_img_series.number_of_members = result.number_of_members + db.add(clem_img_series) + db.commit() + db.refresh(clem_img_series) + + logger.info( + f"TIFF preprocessing results registered for {result.series_name!r} {result.channel!r} image stack" + ) + return True - return True + except Exception: + logger.error(traceback.format_exc()) + logger.error( + f"Exception encountered when registering TIFF preprocessing result for {result.series_name!r} {result.channel!r} image stack" + ) + return False + + finally: + db.close() From 0811cba4edbb63ec532e07ea022be6dbe2018ed9 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 6 Nov 2024 15:36:04 +0000 Subject: [PATCH 37/48] Missed a return statement in 'except' block --- src/murfey/workflows/clem/register_results.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/murfey/workflows/clem/register_results.py b/src/murfey/workflows/clem/register_results.py index 0041176b9..bb111f23e 100644 --- a/src/murfey/workflows/clem/register_results.py +++ b/src/murfey/workflows/clem/register_results.py @@ -323,6 +323,7 @@ def register_tiff_preprocessing_result( except Exception: logger.error(traceback.format_exc()) logger.error("Exception encountered when parsing TIFF preprocessing result") + return False else: logger.error( f"Invalid type for TIFF preprocessing result: {type(message['result'])}" From 3fd77b4680e1fdeec10758652668aed421ad6830 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 6 Nov 2024 16:32:04 +0000 Subject: [PATCH 38/48] Rewrote CLEM preprocessing results registration workflows to make use of entry points defined in pyproject.toml instead --- src/murfey/server/__init__.py | 42 ++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 3df5979ea..e4b0e777b 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -5,6 +5,7 @@ import math import os import subprocess +import sys import time from datetime import datetime from functools import partial, singledispatch @@ -51,10 +52,6 @@ get_microscope, get_security_config, ) -from murfey.workflows.clem.register_results import ( - register_lif_preprocessing_result, - register_tiff_preprocessing_result, -) try: from murfey.server.ispyb import TransportManager # Session @@ -65,6 +62,12 @@ from murfey.util.spa_params import default_spa_parameters from murfey.util.state import global_state +# Import entry_points depending on Python version +if sys.version_info < (3, 10): + from importlib_metadata import EntryPoint, entry_points +else: + from importlib.metadata import EntryPoint, entry_points + try: from importlib.resources import files # type: ignore except ImportError: @@ -2968,15 +2971,28 @@ def feedback_callback(header: dict, message: dict) -> None: if _transport_object: _transport_object.transport.ack(header) return None - elif message["register"] == "register_lif_preprocessing_result": - register_lif_preprocessing_result(message, murfey_db) - if _transport_object: - _transport_object.transport.ack(header) - return None - elif message["register"] == "register_tiff_preprocessing_result": - register_tiff_preprocessing_result(message, murfey_db) - if _transport_object: - _transport_object.transport.ack(header) + elif message["register"] in ( + "register_lif_preprocessing_result", + "register_tiff_preprocessing_result", + ): + murfey_workflows = list( + entry_points().select( + group="murfey.workflows.clem", name=message["register"] + ) + ) + # Run the workflow if a match is found + if len(murfey_workflows) > 0: + workflow: EntryPoint = murfey_workflows[0] + workflow.load()( + message=message, + db=murfey_db, + ) + if _transport_object: + _transport_object.transport.ack(header) + # Nack message if no workflow found for message + else: + if _transport_object: + _transport_object.transport.nack(header) return None if _transport_object: _transport_object.transport.nack(header, requeue=False) From 93c08ea08dc69225989d1deffbf6c8ef36c418ba Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 6 Nov 2024 18:00:00 +0000 Subject: [PATCH 39/48] Adjusted naming convention for LIF and TIFF processing functions and endpoints --- pyproject.toml | 8 ++++---- src/murfey/client/contexts/clem.py | 4 ++-- src/murfey/server/api/clem.py | 16 ++++++++-------- .../{lif_to_stack.py => process_raw_lifs.py} | 0 .../{tiff_to_stack.py => process_raw_tiffs.py} | 0 ...ults.py => register_preprocessing_results.py} | 0 6 files changed, 14 insertions(+), 14 deletions(-) rename src/murfey/workflows/clem/{lif_to_stack.py => process_raw_lifs.py} (100%) rename src/murfey/workflows/clem/{tiff_to_stack.py => process_raw_tiffs.py} (100%) rename src/murfey/workflows/clem/{register_results.py => register_preprocessing_results.py} (100%) diff --git a/pyproject.toml b/pyproject.toml index a8dee4f90..d13aeb99e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -98,10 +98,10 @@ murfey = "murfey.client:run" [project.entry-points."murfey.config.extraction"] "murfey_machine" = "murfey.util.config:get_extended_machine_config" [project.entry-points."murfey.workflows.clem"] -"lif_to_stack" = "murfey.workflows.clem.lif_to_stack:zocalo_cluster_request" -"register_lif_preprocessing_result" = "murfey.workflows.clem.register_results:register_lif_preprocessing_result" -"register_tiff_preprocessing_result" = "murfey.workflows.clem.register_results:register_tiff_preprocessing_result" -"tiff_to_stack" = "murfey.workflows.clem.tiff_to_stack:zocalo_cluster_request" +"process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" +"process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" +"register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" +"register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" [tool.setuptools] package-dir = {"" = "src"} diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index bab50b215..00cec7dc6 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -386,7 +386,7 @@ def process_lif_file( try: # Construct the URL to post the request to - url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/lif_to_stack?lif_file={quote(str(lif_file), safe='')}" + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/process_raw_lifs?lif_file={quote(str(lif_file), safe='')}" # Validate if not url: logger.error( @@ -442,7 +442,7 @@ def process_tiff_series( try: # Construct URL for Murfey server to communicate with - url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/tiff_to_stack" + url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/process_raw_tiffs" if not url: logger.error( "URL could not be constructed from the environment and file path" diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 07d89b6d2..3659f2444 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -632,16 +632,16 @@ def register_image_stack( @router.post( - "/sessions/{session_id}/clem/preprocessing/lif_to_stack" + "/sessions/{session_id}/clem/preprocessing/process_raw_lifs" ) # API posts to this URL -def lif_to_stack( +def process_raw_lifs( session_id: int, # Used by the decorator lif_file: Path, db: Session = murfey_db, ): # Get command line entry point murfey_workflows = entry_points().select( - group="murfey.workflows.clem", name="lif_to_stack" + group="murfey.workflows.clem", name="process_raw_lifs" ) # Use entry point if found @@ -657,7 +657,7 @@ def lif_to_stack( # Pass arguments along to the correct workflow workflow: EntryPoint = list(murfey_workflows)[0] workflow.load()( - # Match the arguments found in murfey.workflows.lif_to_stack + # Match the arguments found in murfey.workflows.process_raw_lifs file=lif_file, root_folder="images", session_id=session_id, @@ -671,15 +671,15 @@ def lif_to_stack( raise RuntimeError("The relevant Murfey workflow was not found") -@router.post("/sessions/{session_id}/clem/preprocessing/tiff_to_stack") -def tiff_to_stack( +@router.post("/sessions/{session_id}/clem/preprocessing/process_raw_tiffs") +def process_raw_tiffs( session_id: int, # Used by the decorator tiff_info: TIFFSeriesInfo, db: Session = murfey_db, ): # Get command line entry point murfey_workflows = entry_points().select( - group="murfey.workflows.clem", name="tiff_to_stack" + group="murfey.workflows.clem", name="process_raw_tiffs" ) # Use entry point if found @@ -694,7 +694,7 @@ def tiff_to_stack( # Pass arguments to correct workflow workflow: EntryPoint = list(murfey_workflows)[0] workflow.load()( - # Match the arguments found in murfey.workflows.tiff_to_stack + # Match the arguments found in murfey.workflows.process_raw_tiffs tiff_list=tiff_info.tiff_files, root_folder="images", session_id=session_id, diff --git a/src/murfey/workflows/clem/lif_to_stack.py b/src/murfey/workflows/clem/process_raw_lifs.py similarity index 100% rename from src/murfey/workflows/clem/lif_to_stack.py rename to src/murfey/workflows/clem/process_raw_lifs.py diff --git a/src/murfey/workflows/clem/tiff_to_stack.py b/src/murfey/workflows/clem/process_raw_tiffs.py similarity index 100% rename from src/murfey/workflows/clem/tiff_to_stack.py rename to src/murfey/workflows/clem/process_raw_tiffs.py diff --git a/src/murfey/workflows/clem/register_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py similarity index 100% rename from src/murfey/workflows/clem/register_results.py rename to src/murfey/workflows/clem/register_preprocessing_results.py From 9cd58ffc09c233e44aa09f657efb407116073e15 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 7 Nov 2024 10:42:21 +0000 Subject: [PATCH 40/48] Replaced 'row' with 'session_row' to make variable more explicit --- src/murfey/server/api/clem.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 3659f2444..4e428c0db 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -649,10 +649,10 @@ def process_raw_lifs( # Get instrument name from the database # This is needed in order to load the correct config file - row: MurfeySession = db.exec( + session_row: MurfeySession = db.exec( select(MurfeySession).where(MurfeySession.id == session_id) ).one() - instrument_name = row.instrument_name + instrument_name = session_row.instrument_name # Pass arguments along to the correct workflow workflow: EntryPoint = list(murfey_workflows)[0] @@ -686,10 +686,10 @@ def process_raw_tiffs( if murfey_workflows: # Load instrument name from database - row: MurfeySession = db.exec( + session_row: MurfeySession = db.exec( select(MurfeySession).where(MurfeySession.id == session_id) ).one() - instrument_name = row.instrument_name + instrument_name = session_row.instrument_name # Pass arguments to correct workflow workflow: EntryPoint = list(murfey_workflows)[0] From befd420659fcfed80ef322ecf120e5363dd46e49 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 7 Nov 2024 10:56:54 +0000 Subject: [PATCH 41/48] Rewrote function to iterate over entry point names and run the match --- src/murfey/server/__init__.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index e4b0e777b..36735272c 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -2971,27 +2971,25 @@ def feedback_callback(header: dict, message: dict) -> None: if _transport_object: _transport_object.transport.ack(header) return None - elif message["register"] in ( - "register_lif_preprocessing_result", - "register_tiff_preprocessing_result", + elif ( + message["register"] + in entry_points().select(group="murfey.workflows.clem").names ): - murfey_workflows = list( + # Run the workflow if a match is found + workflow: EntryPoint = list( # Returns a list of either 1 or 0 entry_points().select( group="murfey.workflows.clem", name=message["register"] ) + )[0] + result = workflow.load()( + message=message, + db=murfey_db, ) - # Run the workflow if a match is found - if len(murfey_workflows) > 0: - workflow: EntryPoint = murfey_workflows[0] - workflow.load()( - message=message, - db=murfey_db, - ) - if _transport_object: + print(f"Workflow returned {result}") + if _transport_object: + if result: _transport_object.transport.ack(header) - # Nack message if no workflow found for message - else: - if _transport_object: + else: _transport_object.transport.nack(header) return None if _transport_object: From 5397c0ef4d80fcd5d9eb457182952b2eff5ee89a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 7 Nov 2024 11:00:39 +0000 Subject: [PATCH 42/48] Missed a print statement --- src/murfey/server/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 36735272c..3d7c34176 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -2985,7 +2985,6 @@ def feedback_callback(header: dict, message: dict) -> None: message=message, db=murfey_db, ) - print(f"Workflow returned {result}") if _transport_object: if result: _transport_object.transport.ack(header) From fa6c5c849a75b6fe709d0eaf8ef2a5faa50d94ee Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 7 Nov 2024 13:26:46 +0000 Subject: [PATCH 43/48] Reverted entry point group name to 'murfey.workflows' to facilitate generalisability to other workflows in future --- pyproject.toml | 2 +- src/murfey/server/__init__.py | 5 ++--- src/murfey/server/api/clem.py | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d13aeb99e..8fd41a22c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,7 +97,7 @@ murfey = "murfey.client:run" "password" = "murfey.server.api.auth:password_token_validation" [project.entry-points."murfey.config.extraction"] "murfey_machine" = "murfey.util.config:get_extended_machine_config" -[project.entry-points."murfey.workflows.clem"] +[project.entry-points."murfey.workflows"] "process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" "process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" "register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 3d7c34176..735437bf1 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -2972,13 +2972,12 @@ def feedback_callback(header: dict, message: dict) -> None: _transport_object.transport.ack(header) return None elif ( - message["register"] - in entry_points().select(group="murfey.workflows.clem").names + message["register"] in entry_points().select(group="murfey.workflows").names ): # Run the workflow if a match is found workflow: EntryPoint = list( # Returns a list of either 1 or 0 entry_points().select( - group="murfey.workflows.clem", name=message["register"] + group="murfey.workflows", name=message["register"] ) )[0] result = workflow.load()( diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 4e428c0db..6f475ae0c 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -641,7 +641,7 @@ def process_raw_lifs( ): # Get command line entry point murfey_workflows = entry_points().select( - group="murfey.workflows.clem", name="process_raw_lifs" + group="murfey.workflows", name="process_raw_lifs" ) # Use entry point if found @@ -679,7 +679,7 @@ def process_raw_tiffs( ): # Get command line entry point murfey_workflows = entry_points().select( - group="murfey.workflows.clem", name="process_raw_tiffs" + group="murfey.workflows", name="process_raw_tiffs" ) # Use entry point if found From 1a0057b8fbe6109fd96589b14de80c8c6fbd0c87 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 7 Nov 2024 13:45:43 +0000 Subject: [PATCH 44/48] Optimised code logic for LIF and TIFF processing API endpoints --- src/murfey/server/api/clem.py | 105 +++++++++++++++------------------- 1 file changed, 47 insertions(+), 58 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 6f475ae0c..15c18703b 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -639,37 +639,31 @@ def process_raw_lifs( lif_file: Path, db: Session = murfey_db, ): - # Get command line entry point - murfey_workflows = entry_points().select( - group="murfey.workflows", name="process_raw_lifs" - ) - - # Use entry point if found - if murfey_workflows: - - # Get instrument name from the database - # This is needed in order to load the correct config file - session_row: MurfeySession = db.exec( - select(MurfeySession).where(MurfeySession.id == session_id) - ).one() - instrument_name = session_row.instrument_name - - # Pass arguments along to the correct workflow - workflow: EntryPoint = list(murfey_workflows)[0] - workflow.load()( - # Match the arguments found in murfey.workflows.process_raw_lifs - file=lif_file, - root_folder="images", - session_id=session_id, - instrument_name=instrument_name, - messenger=_transport_object, - ) - return True - - # Raise error if Murfey workflow not found - else: + try: + # Try and load relevant Murfey workflow + workflow: EntryPoint = list( + entry_points().select(group="murfey.workflows", name="process_raw_lifs") + )[0] + except IndexError: raise RuntimeError("The relevant Murfey workflow was not found") + # Get instrument name from the database to load the correct config file + session_row: MurfeySession = db.exec( + select(MurfeySession).where(MurfeySession.id == session_id) + ).one() + instrument_name = session_row.instrument_name + + # Pass arguments along to the correct workflow + workflow.load()( + # Match the arguments found in murfey.workflows.clem.process_raw_lifs + file=lif_file, + root_folder="images", + session_id=session_id, + instrument_name=instrument_name, + messenger=_transport_object, + ) + return True + @router.post("/sessions/{session_id}/clem/preprocessing/process_raw_tiffs") def process_raw_tiffs( @@ -677,33 +671,28 @@ def process_raw_tiffs( tiff_info: TIFFSeriesInfo, db: Session = murfey_db, ): - # Get command line entry point - murfey_workflows = entry_points().select( - group="murfey.workflows", name="process_raw_tiffs" - ) - - # Use entry point if found - if murfey_workflows: - - # Load instrument name from database - session_row: MurfeySession = db.exec( - select(MurfeySession).where(MurfeySession.id == session_id) - ).one() - instrument_name = session_row.instrument_name - - # Pass arguments to correct workflow - workflow: EntryPoint = list(murfey_workflows)[0] - workflow.load()( - # Match the arguments found in murfey.workflows.process_raw_tiffs - tiff_list=tiff_info.tiff_files, - root_folder="images", - session_id=session_id, - instrument_name=instrument_name, - metadata=tiff_info.series_metadata, - messenger=_transport_object, - ) - return True - - # Raise error if Murfey workflow not found - else: + try: + # Try and load relevant Murfey workflow + workflow: EntryPoint = list( + entry_points().select(group="murfey.workflows", name="process_raw_tiffs") + )[0] + except IndexError: raise RuntimeError("The relevant Murfey workflow was not found") + + # Get instrument name from the database to load the correct config file + session_row: MurfeySession = db.exec( + select(MurfeySession).where(MurfeySession.id == session_id) + ).one() + instrument_name = session_row.instrument_name + + # Pass arguments to correct workflow + workflow.load()( + # Match the arguments found in murfey.workflows.clem.process_raw_tiffs + tiff_list=tiff_info.tiff_files, + root_folder="images", + session_id=session_id, + instrument_name=instrument_name, + metadata=tiff_info.series_metadata, + messenger=_transport_object, + ) + return True From feade10076353e02f38d5cdb4522ca9fac22d100 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 7 Nov 2024 15:16:12 +0000 Subject: [PATCH 45/48] Added 'requeue=False' argument when nacking message --- src/murfey/server/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 735437bf1..413e50076 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -2988,7 +2988,8 @@ def feedback_callback(header: dict, message: dict) -> None: if result: _transport_object.transport.ack(header) else: - _transport_object.transport.nack(header) + # Send it directly to DLQ without trying to rerun it + _transport_object.transport.nack(header, requeue=False) return None if _transport_object: _transport_object.transport.nack(header, requeue=False) From 36efb6f9fafc167bbda918c1589e1fa09ec415ed Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 7 Nov 2024 15:17:47 +0000 Subject: [PATCH 46/48] Fixed logic when reading rsync base path from machine config --- src/murfey/server/api/clem.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 15c18703b..d40fee04d 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -81,23 +81,15 @@ def validate_and_sanitise( machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] - rsync_basepath = machine_config.rsync_basepath - try: - base_path = list(rsync_basepath.parents)[-2].as_posix() - except IndexError: - # Print to troubleshoot - logger.warning(f"Base path {rsync_basepath!r} is too short") - base_path = rsync_basepath.as_posix() - except Exception: - raise Exception("Unexpected exception occurred when loading the file base path") + base_path = machine_config.rsync_basepath.as_posix() # Check that full file path doesn't contain unallowed characters - # Currently allows only: - # - words (alphanumerics and "_"; \w), - # - spaces (\s), - # - periods, - # - dashes, - # - forward slashes ("/") + # Currently allows only: + # - words (alphanumerics and "_"; \w), + # - spaces (\s), + # - periods, + # - dashes, + # - forward slashes ("/") if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False: raise ValueError(f"Unallowed characters present in {file}") From aafc5187f529335724d42f2cec60237ee7d79805 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 8 Nov 2024 12:32:07 +0000 Subject: [PATCH 47/48] Used 'entry_points' from 'backports.entry_points_selectable' instead --- src/murfey/server/__init__.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 413e50076..ae7abcf81 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -5,7 +5,6 @@ import math import os import subprocess -import sys import time from datetime import datetime from functools import partial, singledispatch @@ -57,17 +56,14 @@ from murfey.server.ispyb import TransportManager # Session except AttributeError: pass +from backports.entry_points_selectable import entry_points +from importlib_metadata import EntryPoint # For type hinting only + import murfey.util.db as db from murfey.util import LogFilter from murfey.util.spa_params import default_spa_parameters from murfey.util.state import global_state -# Import entry_points depending on Python version -if sys.version_info < (3, 10): - from importlib_metadata import EntryPoint, entry_points -else: - from importlib.metadata import EntryPoint, entry_points - try: from importlib.resources import files # type: ignore except ImportError: From cb1a570cc6e4bbd84023292a8752f9dac74b9c6f Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 8 Nov 2024 12:49:21 +0000 Subject: [PATCH 48/48] Used 'entry_points' instance from 'backports.entry_points_selectable' instead --- src/murfey/server/api/clem.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index d40fee04d..0f948c36f 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -1,12 +1,13 @@ from __future__ import annotations import re -import sys import traceback +from importlib.metadata import EntryPoint # type hinting only from logging import getLogger from pathlib import Path from typing import Optional, Type, Union +from backports.entry_points_selectable import entry_points from fastapi import APIRouter from sqlalchemy.exc import NoResultFound from sqlmodel import Session, select @@ -24,12 +25,6 @@ from murfey.util.db import Session as MurfeySession from murfey.util.models import TIFFSeriesInfo -# Use backport from importlib_metadata for Python <3.10 -if sys.version_info.major == 3 and sys.version_info.minor < 10: - from importlib_metadata import EntryPoint, entry_points -else: - from importlib.metadata import EntryPoint, entry_points - # Set up logger logger = getLogger("murfey.server.api.clem")