diff --git a/__workflows/SLURM_CellPose_Segmentation.py b/__workflows/SLURM_CellPose_Segmentation.py index 87cd65b..ae3a593 100644 --- a/__workflows/SLURM_CellPose_Segmentation.py +++ b/__workflows/SLURM_CellPose_Segmentation.py @@ -51,7 +51,7 @@ import logging # Version constant for easy version management -VERSION = "2.6.0" +VERSION = "2.7.0" logger = logging.getLogger(__name__) @@ -102,7 +102,8 @@ def runScript(): _versions, _datafiles = slurmClient.get_image_versions_and_data_files( 'cellpose') - _workflow_params = slurmClient.get_workflow_parameters('cellpose') + _all_wf_params = slurmClient.get_workflow_parameters('cellpose') + _workflow_params = {k: v for k, v in _all_wf_params.items() if not v['file_attachment']} logger.debug(_workflow_params) name_descr = f"Name of folder where images are stored, as provided\ with {constants.IMAGE_EXPORT_SCRIPT}" @@ -138,8 +139,8 @@ def runScript(): values=versions) input_list.append(wf_v) for i, (k, param) in enumerate(wfparams.items()): - p = slurmClient.convert_cytype_to_omtype( - param["cytype"], + p = slurmClient.convert_param_type_to_omtype( + param["type"], param["default"], param["name"], description=param["description"], diff --git a/__workflows/SLURM_Run_Workflow.py b/__workflows/SLURM_Run_Workflow.py index cf7456f..2acb8c9 100644 --- a/__workflows/SLURM_Run_Workflow.py +++ b/__workflows/SLURM_Run_Workflow.py @@ -93,7 +93,7 @@ constants.workflow.OUTPUT_NEW_SCREEN, constants.workflow.OUTPUT_ATTACH, constants.workflow.OUTPUT_CSV_TABLE] -VERSION = "2.6.0" +VERSION = "2.7.0" def validate_importer_write_access(slurmClient: SlurmClient, conn: BlitzGateway, client: omscripts.client) -> None: @@ -367,6 +367,7 @@ def runScript(): (wf_versions, _) = slurmClient.get_all_image_versions_and_data_files() na = ["Not Available!"] _workflow_params = {} + _workflow_file_params = {} # file-attachment params keyed by param_id _workflow_available_versions = {} # All currently configured workflows workflows = wf_versions.keys() @@ -380,15 +381,16 @@ def runScript(): _workflow_available_versions[wf] = wf_versions.get( wf, na) # Get the workflow parameters (dynamically) from their repository - _workflow_params[wf] = slurmClient.get_workflow_parameters( - wf) - json_descriptor = slurmClient.pull_descriptor_from_github(wf) - wf_descr = json_descriptor['description'] + _all_wf_params = slurmClient.get_workflow_parameters(wf) + _workflow_params[wf] = {k: v for k, v in _all_wf_params.items() if not v['file_attachment']} + _workflow_file_params[wf] = {k: v for k, v in _all_wf_params.items() if v['file_attachment']} + descriptor = slurmClient.generic_descriptor_from_github(wf) + wf_descr = descriptor['description'] # Build value-choices lookup from the descriptor (scoped per wf, # so param name collisions across workflows are not an issue) value_choices_map = { inp['id']: [rstring(v) for v in inp['value-choices']] - for inp in json_descriptor.get('inputs', []) + for inp in descriptor.get('inputs', []) if inp.get('value-choices') } # Main parameter to select this workflow for execution @@ -405,15 +407,15 @@ def runScript(): # Create a script parameter for all workflow parameters for param_incr, (k, param) in enumerate(_workflow_params[ wf].items()): - # Convert the parameter from cy(tomine)type to om(ero)type - omtype_param = slurmClient.convert_cytype_to_omtype( - param["cytype"], + # Convert the parameter type to om(ero)type + omtype_param = slurmClient.convert_param_type_to_omtype( + param["type"], param["default"], param["name"], description=param["description"], default=param["default"], grouping=f"{parameter_group}.{param_incr+1}", - optional=param['optional'], + optional=True, # always optional: params from other workflows must not block this one **({"values": value_choices_map[k]} if k in value_choices_map else {}) ) # To allow 'duplicate' params, add the wf to uniqueify them @@ -421,6 +423,22 @@ def runScript(): # them to BIOMERO (as the wf will not understand these params) omtype_param._name = f"{wf}_|_{omtype_param._name}" input_list.append(omtype_param) + # File-attachment params: exposed as Long (OMERO FileAnnotation ID) + # They live under a FILE_ prefix so run_workflow can tell them apart. + num_reg = len(_workflow_params[wf]) + for fp_incr, (k, fp) in enumerate(_workflow_file_params[wf].items()): + fmt_str = ", ".join(fp['format']) if fp['format'] else "any" + fp_param = omscripts.Long( + f"{wf}_|_FILE_{k}", + optional=fp['optional'], + grouping=f"{parameter_group}.{num_reg + fp_incr + 1}", + description=( + f"[{fp['type'].capitalize()} attachment] {fp['description']}" + f" Accepted formats: {fmt_str}." + f" Provide the OMERO FileAnnotation ID." + ), + ) + input_list.append(fp_param) # Finish setting up the Omero script UI inputs = { f"{p._name}": p for p in input_list @@ -601,7 +619,9 @@ def runScript(): UI_messages, slurm_job_id, wf_id, task_id = run_workflow( slurmClient, _workflow_params[wf_name], + _workflow_file_params[wf_name], client, + conn, UI_messages, zipfile, email, @@ -616,12 +636,12 @@ def runScript(): while slurm_job_id_list: # Query all jobids we care about + job_status_dict = {} try: job_status_dict, _ = slurmClient.check_job_status( slurm_job_id_list) except Exception as e: - UI_messages += f" ERROR WITH JOB: {e}" - wf_failed = True + logger.warning(f"Transient error checking job status, will retry: {e}") for slurm_job_id, job_state in job_status_dict.items(): logger.debug(f"Job {slurm_job_id} is {job_state}.") @@ -751,7 +771,9 @@ def runScript(): def run_workflow(slurmClient: SlurmClient, workflow_params, + file_params, client, + conn, UI_messages: str, zipfile, email, @@ -761,14 +783,17 @@ def run_workflow(slurmClient: SlurmClient, Submits a named workflow to SLURM with user-specified parameters and monitors initial job submission status. Handles parameter extraction - from OMERO UI inputs and manages workflow tracking state. + from OMERO UI inputs, file-attachment transfers to HPC, and workflow + tracking state. Args: slurmClient (SlurmClient): Active SLURM client connection. - workflow_params: Dictionary of workflow-specific parameters. + workflow_params: Dictionary of regular workflow parameters. + file_params: Dictionary of file-attachment params (annotation IDs). client: OMERO script client for parameter access. + conn: OMERO BlitzGateway connection (needed for file transfers). UI_messages (str): Accumulated user interface messages. - zipfile: Name of input data file on SLURM. + zipfile: Name of input data file on SLURM (determines data_path). email: User email for job notifications. name: Workflow name to execute. wf_id: Workflow UUID for tracking. @@ -784,13 +809,86 @@ def run_workflow(slurmClient: SlurmClient, logger.info(f"Submitting workflow: {name}") workflow_version = unwrap(client.getInput(f"{name}_Version")) - # Extract workflow parameters + # Extract regular workflow parameters kwargs = {} for k in workflow_params: kwargs[k] = unwrap(client.getInput(f"{name}_|_{k}")) - logger.debug(f"Workflow parameters: {kwargs}") + # Transfer file-attachment inputs to HPC and resolve their Slurm paths + if file_params: + logger.info(''' + # ------------------------------------------------ + # :: 1b. Transfer file attachments to Slurm :: + # ------------------------------------------------ + ''') + svc = conn.getScriptService() + scripts = svc.getScripts() + file_transfer_scripts = [constants.FILE_TRANSFER_SCRIPT] + ft_matches = [(unwrap(s.id), unwrap(s.getName())) + for s in scripts if unwrap(s.getName()) in file_transfer_scripts] + if not ft_matches: + logger.warning( + f"File transfer script {file_transfer_scripts} not found — " + f"skipping file-attachment transfer for {name}." + ) + else: + ft_script_id, ft_script_name = ft_matches[0] + for param_id, fp in file_params.items(): + ann_id = unwrap(client.getInput(f"{name}_|_FILE_{param_id}")) + if ann_id is None: + if not fp['optional']: + err = f"Required file attachment '{param_id}' has no annotation ID — cannot run workflow." + logger.error(err) + raise ValueError(err) + logger.debug(f"No annotation ID supplied for optional param '{param_id}', skipping.") + continue + logger.info(f"Transferring file attachment {ann_id} for param '{param_id}'") + ft_inputs = { + constants.file_transfer.FILE_ANNOTATION_ID: rlong(ann_id), + constants.file_transfer.FOLDER: rstring(zipfile), + constants.CLEANUP: client.getInput(constants.CLEANUP) or rbool(True), + } + try: + ft_task_id = slurmClient.workflowTracker.add_task_to_workflow( + wf_id, ft_script_name, VERSION, + ann_id, {k: unwrap(v) for k, v in ft_inputs.items()} + ) + slurmClient.workflowTracker.start_task(ft_task_id) + except Exception as db_e: + logger.error(f"DB error adding file-transfer task: {db_e}") + raise + ft_rv, _ = runOMEROScript(client, svc, ft_script_id, ft_inputs, + slurmClient=slurmClient) + slurm_path = unwrap(ft_rv.get('Slurm_Path')) if ft_rv else None + ft_msg = unwrap(ft_rv.get('Message', None)) if ft_rv else '' + if slurm_path: + kwargs[param_id] = slurm_path + UI_messages += f"Transferred {fp['type']} '{param_id}' to {slurm_path}. " + slurmClient.workflowTracker.complete_task(ft_task_id, ft_msg or slurm_path) + else: + err = f"File transfer for '{param_id}' (ann {ann_id}) returned no path: {ft_msg}" + logger.warning(err) + slurmClient.workflowTracker.fail_task(ft_task_id, err) + if not fp.get('optional', True): + raise ValueError(err) + + logger.debug(f"Workflow parameters (incl. file paths): {kwargs}") try: + # Pre-flight: verify the job script exists on Slurm before submitting + configured_job = slurmClient.slurm_model_jobs.get(name.lower()) + job_script = ( + f"{slurmClient.slurm_script_path}/{configured_job}" + if configured_job + else f"{slurmClient.slurm_script_path}/jobs/{name}.sh" + ) + check_result = slurmClient.run(f'test -f "{job_script}"', warn=True) + if check_result.exited != 0: + raise FileNotFoundError( + f"Job script not found on Slurm: {job_script}. " + f"Please generate the job script for '{name}' first " + f"(use 'Validate Slurm Setup' or check your slurm-scripts repo)." + ) + cp_result, slurm_job_id, wf_id, task_id = slurmClient.run_workflow( workflow_name=name, workflow_version=workflow_version, diff --git a/__workflows/SLURM_Run_Workflow_Batched.py b/__workflows/SLURM_Run_Workflow_Batched.py index a2049b0..725c7ec 100644 --- a/__workflows/SLURM_Run_Workflow_Batched.py +++ b/__workflows/SLURM_Run_Workflow_Batched.py @@ -71,7 +71,7 @@ constants.workflow.OUTPUT_CSV_TABLE] # Version constant for easy version management -VERSION = "2.6.0" +VERSION = "2.7.0" def runScript(): @@ -213,6 +213,7 @@ def runScript(): (wf_versions, _) = slurmClient.get_all_image_versions_and_data_files() na = ["Not Available!"] _workflow_params = {} + _workflow_file_params = {} _workflow_available_versions = {} # All currently configured workflows workflows = wf_versions.keys() @@ -226,16 +227,17 @@ def runScript(): _workflow_available_versions[wf] = wf_versions.get( wf, na) # Get the workflow parameters (dynamically) from their repository - _workflow_params[wf] = slurmClient.get_workflow_parameters( - wf) + _all_wf_params = slurmClient.get_workflow_parameters(wf) + _workflow_params[wf] = {k: v for k, v in _all_wf_params.items() if not v['file_attachment']} + _workflow_file_params[wf] = {k: v for k, v in _all_wf_params.items() if v['file_attachment']} # Main parameter to select this workflow for execution - json_descriptor = slurmClient.pull_descriptor_from_github(wf) - wf_descr = json_descriptor['description'] + descriptor = slurmClient.generic_descriptor_from_github(wf) + wf_descr = descriptor['description'] # Build value-choices lookup from the descriptor (scoped per wf, # so param name collisions across workflows are not an issue) value_choices_map = { inp['id']: [rstring(v) for v in inp['value-choices']] - for inp in json_descriptor.get('inputs', []) + for inp in descriptor.get('inputs', []) if inp.get('value-choices') } wf_ = omscripts.Bool(wf, grouping=parameter_group, default=False, @@ -251,9 +253,9 @@ def runScript(): # Create a script parameter for all workflow parameters for param_incr, (k, param) in enumerate(_workflow_params[ wf].items()): - # Convert the parameter from cy(tomine)type to om(ero)type - omtype_param = slurmClient.convert_cytype_to_omtype( - param["cytype"], + # Convert the parameter type to om(ero)type + omtype_param = slurmClient.convert_param_type_to_omtype( + param["type"], param["default"], param["name"], description=param["description"], @@ -267,6 +269,22 @@ def runScript(): # them to BIOMERO (as the wf will not understand these params) omtype_param._name = f"{wf}_|_{omtype_param._name}" input_list.append(omtype_param) + # File-attachment params: exposed as Long (OMERO FileAnnotation ID) + # They live under a FILE_ prefix so SLURM_Run_Workflow can tell them apart. + num_reg = len(_workflow_params[wf]) + for fp_incr, (k, fp) in enumerate(_workflow_file_params[wf].items()): + fmt_str = ", ".join(fp['format']) if fp['format'] else "any" + fp_param = omscripts.Long( + f"{wf}_|_FILE_{k}", + optional=fp['optional'], + grouping=f"{parameter_group}.{num_reg + fp_incr + 1}", + description=( + f"[{fp['type'].capitalize()} attachment] {fp['description']}" + f" Accepted formats: {fmt_str}." + f" Provide the OMERO FileAnnotation ID." + ), + ) + input_list.append(fp_param) # Finish setting up the Omero script UI inputs = { p._name: p for p in input_list diff --git a/_data/SLURM_Get_Results.py b/_data/SLURM_Get_Results.py index b7ce3f5..0ef3561 100644 --- a/_data/SLURM_Get_Results.py +++ b/_data/SLURM_Get_Results.py @@ -75,7 +75,7 @@ from omero_metadata.populate import ParsingContext # Version constant for easy version management -VERSION = "2.6.0" +VERSION = "2.7.0" OBJECT_TYPES = ( 'Plate', diff --git a/_data/SLURM_Get_Update.py b/_data/SLURM_Get_Update.py index d89e863..377ff9a 100644 --- a/_data/SLURM_Get_Update.py +++ b/_data/SLURM_Get_Update.py @@ -54,7 +54,7 @@ import sys # Version constant for easy version management -VERSION = "2.6.0" +VERSION = "2.7.0" logger = logging.getLogger(__name__) diff --git a/_data/SLURM_Import_Results.py b/_data/SLURM_Import_Results.py index 0e7e54f..7b44f08 100644 --- a/_data/SLURM_Import_Results.py +++ b/_data/SLURM_Import_Results.py @@ -138,7 +138,7 @@ def _mask_url(url): return url # Fallback function if import fails "IMPORTER_ENABLED is false - dataset imports will not be supported") # Version constant for easy version management -VERSION = "2.6.0" +VERSION = "2.7.0" OBJECT_TYPES = ( 'Plate', diff --git a/_data/SLURM_Remote_Conversion.py b/_data/SLURM_Remote_Conversion.py index 8825b76..8bac400 100644 --- a/_data/SLURM_Remote_Conversion.py +++ b/_data/SLURM_Remote_Conversion.py @@ -49,7 +49,7 @@ CONV_OPTIONS_TARGET = ['tiff', 'zarr'] # Version constant for easy version management -VERSION = "2.6.0" +VERSION = "2.7.0" def runScript(): diff --git a/_data/_SLURM_File_Transfer.py b/_data/_SLURM_File_Transfer.py new file mode 100644 index 0000000..9b5de8c --- /dev/null +++ b/_data/_SLURM_File_Transfer.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (C) 2024 T T Luik +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +BIOMERO SLURM File Attachment Transfer Script + +Transfers a single OMERO FileAnnotation to a SLURM cluster, placing it +under the correct parameter input directory so the workflow container +receives it as a CLI flag. + +The destination on SLURM is: + {slurm_data_path}/{Folder_Name}/data/in/{filename} + +The resolved absolute SLURM path is returned as the ``Slurm_Path`` output, +which ``SLURM_Run_Workflow.py`` injects as the CLI argument value for the +corresponding workflow parameter. + +Inputs: + Annotation_ID (Long): OMERO FileAnnotation ID supplied by the user. + Folder_Name (String): The job data folder on SLURM (same value used + when the images were transferred, e.g. ``SLURM_IMAGES_1234567890``). + Cleanup? (Bool, optional): Remove the local temp file after transfer + (default True). + +Outputs: + Slurm_Path (String): Absolute SLURM path of the transferred file. + Message (String): Human-readable status / error message. + +Authors: Torec Luik +Institutions: Amsterdam UMC +Contact: cellularimaging@amsterdamumc.nl +""" + +import os +import sys +import logging +import logging.handlers +import tempfile + +import omero.scripts as scripts +import omero +from omero.gateway import BlitzGateway +from omero.rtypes import rstring, rlong + +from biomero import SlurmClient, constants + +logger = logging.getLogger(__name__) + +VERSION = "2.7.0" + + +# --------------------------------------------------------------------------- +# Core transfer logic +# --------------------------------------------------------------------------- + +def transfer_file_to_slurm( + conn: BlitzGateway, + annotation_id: int, + folder_name: str, + slurmClient: SlurmClient, + cleanup: bool = True, +) -> tuple[str, str]: + """Download an OMERO FileAnnotation and upload it to the SLURM cluster. + + Args: + conn: Active OMERO BlitzGateway connection. + annotation_id: ID of the OMERO FileAnnotation to transfer. + folder_name: Name of the job data folder on SLURM. Must match the + folder name used for the image transfer so both end up in the + same data/in/ directory. Created automatically if absent. + slurmClient: Authenticated SlurmClient instance. + cleanup: If True, delete the local temp file after transfer. + + Returns: + (slurm_path, message): Absolute path of the file on SLURM and a + human-readable status message. + + Raises: + ValueError: If the annotation or its underlying file cannot be found. + Exception: On SSH / SCP failure. + """ + # ------------------------------------------------------------------ + # 1. Fetch the FileAnnotation and its underlying OriginalFile + # ------------------------------------------------------------------ + ann = conn.getObject("FileAnnotation", annotation_id) + if ann is None: + raise ValueError( + f"FileAnnotation {annotation_id} not found or not accessible." + ) + orig_file = ann.getFile() + if orig_file is None: + raise ValueError( + f"FileAnnotation {annotation_id} has no associated OriginalFile." + ) + filename = orig_file.getName() + file_size = orig_file.getSize() + logger.info( + f"Downloading FileAnnotation {annotation_id} " + f"('{filename}', {file_size} bytes) …" + ) + + # ------------------------------------------------------------------ + # 2. Download file bytes to a local temp file + # ------------------------------------------------------------------ + tmp_dir = tempfile.mkdtemp(prefix="biomero_ft_") + local_path = os.path.join(tmp_dir, filename) + try: + with open(local_path, "wb") as fh: + for chunk in ann.getFileInChunks(): + fh.write(chunk) + logger.info(f"File saved locally to {local_path}") + + # ------------------------------------------------------------------ + # 3. Create destination directory on SLURM + # ------------------------------------------------------------------ + dest_dir = f"{slurmClient.slurm_data_path}/{folder_name}/data/in" + mkdir_result = slurmClient.run_commands( + [f'mkdir -p "{dest_dir}"'] + ) + if not mkdir_result.ok: + raise Exception( + f"Failed to create remote directory '{dest_dir}': " + f"{mkdir_result.stderr}" + ) + logger.info(f"Remote directory ready: {dest_dir}") + + # ------------------------------------------------------------------ + # 4. Transfer file to SLURM + # ------------------------------------------------------------------ + slurmClient.put(local=local_path, remote=dest_dir) + slurm_path = f"{dest_dir}/{filename}" + logger.info(f"File transferred to SLURM: {slurm_path}") + message = ( + f"Successfully transferred '{filename}' to SLURM " + f"into '{folder_name}/data/in/'." + ) + return slurm_path, message + + finally: + if cleanup and os.path.exists(local_path): + os.remove(local_path) + os.rmdir(tmp_dir) + logger.debug(f"Cleaned up local temp file {local_path}") + + +# --------------------------------------------------------------------------- +# OMERO script entry point +# --------------------------------------------------------------------------- + +def run_script(): + """OMERO script entry point for _SLURM_File_Transfer.""" + + with SlurmClient.from_config() as slurmClient: + + client = scripts.client( + "_SLURM_File_Transfer", + f"""Transfer an OMERO FileAnnotation to the SLURM cluster. + +Places the file under: + {{slurm_data_path}}/{{Folder_Name}}/data/in/{{filename}} + +Returns the resolved SLURM path so SLURM_Run_Workflow can pass it as a +CLI flag to the workflow container. + +This runs a script remotely on your SLURM cluster. +Connection ready? {slurmClient.validate()}""", + + scripts.Long( + constants.file_transfer.FILE_ANNOTATION_ID, + optional=False, + grouping="1", + description="OMERO FileAnnotation ID of the file to transfer.", + ), + + scripts.String( + constants.file_transfer.FOLDER, + optional=False, + grouping="2", + description=( + "SLURM job data folder name (e.g. SLURM_IMAGES_1234567890). " + "Must match the folder name used for the image transfer so " + "images and file attachments share the same data/in/ directory. " + "The directory is created automatically if it does not yet exist." + ), + ), + + scripts.Bool( + constants.CLEANUP, + grouping="3", + description=( + "Remove the local temporary file after transfer. " + "Uncheck for debugging." + ), + default=True, + ), + + version=VERSION, + authors=["Torec Luik"], + institutions=["Amsterdam UMC"], + contact="cellularimaging@amsterdamumc.nl", + authorsInstitutions=[[1]], + namespaces=[omero.constants.namespaces.NSDYNAMIC], + ) + + try: + conn = BlitzGateway(client_obj=client) + script_params = client.getInputs(unwrap=True) + + annotation_id = script_params[constants.file_transfer.FILE_ANNOTATION_ID] + folder_name = script_params[constants.file_transfer.FOLDER] + cleanup = script_params.get(constants.CLEANUP, True) + + slurm_path, message = transfer_file_to_slurm( + conn=conn, + annotation_id=annotation_id, + folder_name=folder_name, + slurmClient=slurmClient, + cleanup=cleanup, + ) + + client.setOutput("Message", rstring(message)) + client.setOutput("Slurm_Path", rstring(slurm_path)) + + except Exception as exc: + logger.exception("File transfer failed") + client.setOutput("Message", rstring(f"ERROR: {exc}")) + + finally: + client.closeSession() + + +if __name__ == "__main__": + # Some defaults from OMERO; don't feel like reading ice files. + # Retrieve the value of the OMERODIR environment variable + OMERODIR = os.environ.get('OMERODIR', '/opt/omero/server/OMERO.server') + LOGDIR = os.path.join(OMERODIR, 'var', 'log') + LOGFORMAT = "%(asctime)s %(levelname)-5.5s [%(name)40s] " \ + "[%(process)d] (%(threadName)-10s) %(message)s" + # Added the process id + LOGSIZE = 500000000 + LOGNUM = 9 + log_filename = 'biomero.log' + # Create a stream handler with INFO level (for OMERO.web output) + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setLevel(logging.INFO) + # Create DEBUG logging to rotating logfile at var/log + logging.basicConfig(level=logging.DEBUG, + format=LOGFORMAT, + handlers=[ + stream_handler, + logging.handlers.RotatingFileHandler( + os.path.join(LOGDIR, log_filename), + maxBytes=LOGSIZE, + backupCount=LOGNUM) + ]) + + # Silence some of the DEBUG - Extended for cleaner BIOMERO logs + logging.getLogger('omero.gateway.utils').setLevel(logging.WARNING) + logging.getLogger('omero.gateway').setLevel(logging.WARNING) # Silences proxy creation spam + logging.getLogger('omero.client').setLevel(logging.WARNING) + logging.getLogger('paramiko.transport').setLevel(logging.WARNING) + logging.getLogger('paramiko.sftp').setLevel(logging.WARNING) + logging.getLogger('urllib3').setLevel(logging.WARNING) + logging.getLogger('requests').setLevel(logging.WARNING) + logging.getLogger('requests_cache').setLevel(logging.WARNING) # Cache logs + logging.getLogger('requests-cache').setLevel(logging.WARNING) # Alt naming + logging.getLogger('requests_cache.core').setLevel(logging.WARNING) # Core module + logging.getLogger('requests_cache.backends').setLevel(logging.WARNING) + logging.getLogger('requests_cache.backends.base').setLevel(logging.WARNING) + logging.getLogger('requests_cache.backends.sqlite').setLevel( + logging.WARNING) + logging.getLogger('requests_cache.policy').setLevel(logging.WARNING) + logging.getLogger('requests_cache.policy.actions').setLevel( + logging.WARNING) + logging.getLogger('invoke').setLevel(logging.WARNING) + logging.getLogger('fabric').setLevel(logging.WARNING) # SSH operations + logging.getLogger('Ice').setLevel(logging.ERROR) + logging.getLogger('ZeroC').setLevel(logging.ERROR) + + run_script() diff --git a/_data/_SLURM_Image_Transfer.py b/_data/_SLURM_Image_Transfer.py index 86d2152..e87fc87 100644 --- a/_data/_SLURM_Image_Transfer.py +++ b/_data/_SLURM_Image_Transfer.py @@ -76,7 +76,7 @@ logger = logging.getLogger(__name__) # Version constant for easy version management -VERSION = "2.6.0" +VERSION = "2.7.0" # keep track of log strings. log_strings = [] @@ -104,7 +104,7 @@ def compress(target, base): target (str): Name of the zip file to write (e.g., "folder.zip"). base (str): Name of folder to zip up (e.g., "folder"). """ - base_name, ext = target.split(".") + base_name, ext = target.rsplit(".", 1) shutil.make_archive(base_name, ext, base) diff --git a/admin/Example_Minimal_Slurm_Script.py b/admin/Example_Minimal_Slurm_Script.py new file mode 100644 index 0000000..187fb2b --- /dev/null +++ b/admin/Example_Minimal_Slurm_Script.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Original work Copyright (C) 2014 University of Dundee +# & Open Microscopy Environment. +# All Rights Reserved. +# Modified work Copyright 2022 Torec Luik, Amsterdam UMC +# Use is subject to license terms supplied in LICENSE.txt +# + +from __future__ import print_function +from omero.rtypes import rstring +import omero.scripts as omscripts +import omero +import re +import subprocess +from biomero import SlurmClient +from omero.gateway import BlitzGateway +import logging +import os +import sys + +logger = logging.getLogger(__name__) + +_MAX_CMD_LENGTH = 500 + +# Block destructive or dangerous command patterns even for admins +_DANGEROUS_PATTERNS = [ + r"\brm\b", # rm (remove files) + r"\brmdir\b", # rmdir + r"\bmkfs\b", # format filesystem + r"\bdd\b", # disk dump/overwrite + r"\bkill\b", # kill processes + r"\bpkill\b", # kill by name + r"\bscancel\b", # cancel Slurm jobs + r"\breboot\b", # reboot node + r"\bshutdown\b", # shutdown node + r"\bpoweroff\b", # poweroff node + r"\btruncate\b", # truncate files + r"\bchmod\b", # change permissions + r"\bchown\b", # change ownership + r">\s*/", # redirect to root path + r"\|\s*sh\b", # pipe to shell + r"\|\s*bash\b", # pipe to bash + r"\beval\b", # eval arbitrary code + r"\bexec\b", # replace process +] + + +def _check_command_safety(cmd: str) -> str | None: + """Return an error message if the command matches a dangerous pattern.""" + if len(cmd) > _MAX_CMD_LENGTH: + return (f"Command too long ({len(cmd)} chars, max {_MAX_CMD_LENGTH}). " + f"Refusing to run.") + for pattern in _DANGEROUS_PATTERNS: + if re.search(pattern, cmd, re.IGNORECASE): + return (f"Command blocked: matches dangerous pattern '{pattern}'. " + f"If you need this, run it directly on the Slurm cluster.") + return None + + +_RUNSLRM = "Check_SLURM_Status" +_SQUEUE = "Check_Queue" +_SINFO = "Check_Cluster" +_SOTHER = "Run_Other_Command" +_SCMD = "Linux_Command" +_DEFAULT_SCMD = "ls -la" + +VERSION = "2.7.0" + + +def runScript(): + """ + The main entry point of the script + """ + client = omscripts.client( + 'Slurm SSH Command (Admin Only)', + '''Run SSH commands on the Slurm cluster. + + **ADMIN ONLY**: This script requires OMERO administrator privileges. + ''', + omscripts.Bool(_RUNSLRM, grouping="01", default=True, + description="Run Slurm status commands"), + omscripts.Bool(_SQUEUE, grouping="01.1", default=True, + description="Show job queue (squeue -u $USER)"), + omscripts.Bool(_SINFO, grouping="01.2", default=False, + description="Show cluster info (sinfo)"), + omscripts.Bool(_SOTHER, grouping="02", default=False, + description="Run a custom Linux command on Slurm"), + omscripts.String(_SCMD, optional=True, grouping="02.1", + description="The Linux command to run on Slurm", + default=_DEFAULT_SCMD), + namespaces=[omero.constants.namespaces.NSDYNAMIC], + version=VERSION, + authors=["Torec Luik"], + institutions=["Amsterdam UMC"], + contact='cellularimaging@amsterdamumc.nl', + authorsInstitutions=[[1]] + ) + + try: + conn = BlitzGateway(client_obj=client) + user = conn.getUser() + is_admin = user.isAdmin() + user_id = conn.getUserId() + + logger.info(f"User ID {user_id} admin status: {is_admin}") + + if not is_admin: + logger.warning("Access denied: Admin privileges required") + client.setOutput("Message", rstring( + f"ACCESS DENIED: This script requires OMERO administrator " + f"privileges. User ID {user_id} is not an admin." + )) + return + + scriptParams = client.getInputs(unwrap=True) + logger.info(f"Params: {scriptParams}") + + user_name = user.getName() + + with SlurmClient.from_config() as slurmClient: + logger.info(f"Slurm connection valid: {slurmClient.validate()}") + + cmdlist = [] + if scriptParams.get(_RUNSLRM): + if scriptParams.get(_SQUEUE): + cmdlist.append("squeue -u $USER") + if scriptParams.get(_SINFO): + cmdlist.append("sinfo") + if scriptParams.get(_SOTHER): + custom_cmd = scriptParams.get(_SCMD, "").strip() + safety_error = _check_command_safety(custom_cmd) + if safety_error: + logger.warning( + f"BLOCKED command from admin user {user_name} " + f"(id={user_id}): '{custom_cmd}' — {safety_error}" + ) + client.setOutput("Message", rstring( + f"Command blocked: {safety_error}" + )) + return + cmdlist.append(custom_cmd) + + print_result = [] + try: + for cmd in cmdlist: + # Audit: log who ran what + logger.info( + f"AUDIT: user={user_name} (id={user_id}) " + f"running command: {cmd}" + ) + results = slurmClient.run(cmd) + logger.info(f"Ran slurm: {results}") + print_result.append(str(results)) + except Exception as e: + logger.error(f"Command error: {e}") + print_result.append(f"Error: {e}") + + client.setOutput("Message", rstring("\n".join(print_result))) + + finally: + client.closeSession() + + +if __name__ == '__main__': + # Some defaults from OMERO; don't feel like reading ice files. + # Retrieve the value of the OMERODIR environment variable + OMERODIR = os.environ.get('OMERODIR', '/opt/omero/server/OMERO.server') + LOGDIR = os.path.join(OMERODIR, 'var', 'log') + LOGFORMAT = "%(asctime)s %(levelname)-5.5s [%(name)40s] " \ + "[%(process)d] (%(threadName)-10s) %(message)s" + # Added the process id + LOGSIZE = 500000000 + LOGNUM = 9 + log_filename = 'biomero.log' + # Create a stream handler with INFO level (for OMERO.web output) + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setLevel(logging.INFO) + # Create DEBUG logging to rotating logfile at var/log + logging.basicConfig(level=logging.DEBUG, + format=LOGFORMAT, + handlers=[ + stream_handler, + logging.handlers.RotatingFileHandler( + os.path.join(LOGDIR, log_filename), + maxBytes=LOGSIZE, + backupCount=LOGNUM) + ]) + + # Silence some of the DEBUG + logging.getLogger('omero.gateway.utils').setLevel(logging.WARNING) + logging.getLogger('paramiko.transport').setLevel(logging.WARNING) + + runScript() \ No newline at end of file diff --git a/admin/SLURM_Init_environment.py b/admin/SLURM_Init_environment.py index 825cd15..dc17a9a 100644 --- a/admin/SLURM_Init_environment.py +++ b/admin/SLURM_Init_environment.py @@ -37,7 +37,7 @@ import sys logger = logging.getLogger(__name__) -VERSION = "2.6.0" +VERSION = "2.7.0" def runScript(): diff --git a/admin/SLURM_check_setup.py b/admin/SLURM_check_setup.py index f692c40..b9c5164 100644 --- a/admin/SLURM_check_setup.py +++ b/admin/SLURM_check_setup.py @@ -58,7 +58,7 @@ import pkg_resources # Version constant for easy version management -VERSION = "2.6.0" +VERSION = "2.7.0" logger = logging.getLogger(__name__) diff --git a/admin/Tail_logs.py b/admin/Tail_logs.py index 3b6aad3..12ce06e 100755 --- a/admin/Tail_logs.py +++ b/admin/Tail_logs.py @@ -26,7 +26,7 @@ _DEFAULT_TAIL_LINES = 5000 # Version constant for easy version management -VERSION = "2.6.0" +VERSION = "2.7.0" logger = logging.getLogger(__name__)