Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions __workflows/SLURM_CellPose_Segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import logging

# Version constant for easy version management
VERSION = "2.6.0"
VERSION = "2.7.0"

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -102,7 +102,8 @@ def runScript():

_versions, _datafiles = slurmClient.get_image_versions_and_data_files(
'cellpose')
_workflow_params = slurmClient.get_workflow_parameters('cellpose')
_all_wf_params = slurmClient.get_workflow_parameters('cellpose')
_workflow_params = {k: v for k, v in _all_wf_params.items() if not v['file_attachment']}
logger.debug(_workflow_params)
name_descr = f"Name of folder where images are stored, as provided\
with {constants.IMAGE_EXPORT_SCRIPT}"
Expand Down Expand Up @@ -138,8 +139,8 @@ def runScript():
values=versions)
input_list.append(wf_v)
for i, (k, param) in enumerate(wfparams.items()):
p = slurmClient.convert_cytype_to_omtype(
param["cytype"],
p = slurmClient.convert_param_type_to_omtype(
param["type"],
param["default"],
param["name"],
description=param["description"],
Expand Down
132 changes: 115 additions & 17 deletions __workflows/SLURM_Run_Workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
constants.workflow.OUTPUT_NEW_SCREEN,
constants.workflow.OUTPUT_ATTACH,
constants.workflow.OUTPUT_CSV_TABLE]
VERSION = "2.6.0"
VERSION = "2.7.0"


def validate_importer_write_access(slurmClient: SlurmClient, conn: BlitzGateway, client: omscripts.client) -> None:
Expand Down Expand Up @@ -367,6 +367,7 @@ def runScript():
(wf_versions, _) = slurmClient.get_all_image_versions_and_data_files()
na = ["Not Available!"]
_workflow_params = {}
_workflow_file_params = {} # file-attachment params keyed by param_id
_workflow_available_versions = {}
# All currently configured workflows
workflows = wf_versions.keys()
Expand All @@ -380,15 +381,16 @@ def runScript():
_workflow_available_versions[wf] = wf_versions.get(
wf, na)
# Get the workflow parameters (dynamically) from their repository
_workflow_params[wf] = slurmClient.get_workflow_parameters(
wf)
json_descriptor = slurmClient.pull_descriptor_from_github(wf)
wf_descr = json_descriptor['description']
_all_wf_params = slurmClient.get_workflow_parameters(wf)
_workflow_params[wf] = {k: v for k, v in _all_wf_params.items() if not v['file_attachment']}
_workflow_file_params[wf] = {k: v for k, v in _all_wf_params.items() if v['file_attachment']}
descriptor = slurmClient.generic_descriptor_from_github(wf)
wf_descr = descriptor['description']
# Build value-choices lookup from the descriptor (scoped per wf,
# so param name collisions across workflows are not an issue)
value_choices_map = {
inp['id']: [rstring(v) for v in inp['value-choices']]
for inp in json_descriptor.get('inputs', [])
for inp in descriptor.get('inputs', [])
if inp.get('value-choices')
}
# Main parameter to select this workflow for execution
Expand All @@ -405,22 +407,38 @@ def runScript():
# Create a script parameter for all workflow parameters
for param_incr, (k, param) in enumerate(_workflow_params[
wf].items()):
# Convert the parameter from cy(tomine)type to om(ero)type
omtype_param = slurmClient.convert_cytype_to_omtype(
param["cytype"],
# Convert the parameter type to om(ero)type
omtype_param = slurmClient.convert_param_type_to_omtype(
param["type"],
param["default"],
param["name"],
description=param["description"],
default=param["default"],
grouping=f"{parameter_group}.{param_incr+1}",
optional=param['optional'],
optional=True, # always optional: params from other workflows must not block this one
**({"values": value_choices_map[k]} if k in value_choices_map else {})
)
# To allow 'duplicate' params, add the wf to uniqueify them
# we have to remove this prefix later again, before passing
# them to BIOMERO (as the wf will not understand these params)
omtype_param._name = f"{wf}_|_{omtype_param._name}"
input_list.append(omtype_param)
# File-attachment params: exposed as Long (OMERO FileAnnotation ID)
# They live under a FILE_ prefix so run_workflow can tell them apart.
num_reg = len(_workflow_params[wf])
for fp_incr, (k, fp) in enumerate(_workflow_file_params[wf].items()):
fmt_str = ", ".join(fp['format']) if fp['format'] else "any"
fp_param = omscripts.Long(
f"{wf}_|_FILE_{k}",
optional=fp['optional'],
grouping=f"{parameter_group}.{num_reg + fp_incr + 1}",
description=(
f"[{fp['type'].capitalize()} attachment] {fp['description']}"
f" Accepted formats: {fmt_str}."
f" Provide the OMERO FileAnnotation ID."
),
)
input_list.append(fp_param)
# Finish setting up the Omero script UI
inputs = {
f"{p._name}": p for p in input_list
Expand Down Expand Up @@ -601,7 +619,9 @@ def runScript():
UI_messages, slurm_job_id, wf_id, task_id = run_workflow(
slurmClient,
_workflow_params[wf_name],
_workflow_file_params[wf_name],
client,
conn,
UI_messages,
zipfile,
email,
Expand All @@ -616,12 +636,12 @@ def runScript():

while slurm_job_id_list:
# Query all jobids we care about
job_status_dict = {}
try:
job_status_dict, _ = slurmClient.check_job_status(
slurm_job_id_list)
except Exception as e:
UI_messages += f" ERROR WITH JOB: {e}"
wf_failed = True
logger.warning(f"Transient error checking job status, will retry: {e}")

for slurm_job_id, job_state in job_status_dict.items():
logger.debug(f"Job {slurm_job_id} is {job_state}.")
Expand Down Expand Up @@ -751,7 +771,9 @@ def runScript():

def run_workflow(slurmClient: SlurmClient,
workflow_params,
file_params,
client,
conn,
UI_messages: str,
zipfile,
email,
Expand All @@ -761,14 +783,17 @@ def run_workflow(slurmClient: SlurmClient,

Submits a named workflow to SLURM with user-specified parameters and
monitors initial job submission status. Handles parameter extraction
from OMERO UI inputs and manages workflow tracking state.
from OMERO UI inputs, file-attachment transfers to HPC, and workflow
tracking state.

Args:
slurmClient (SlurmClient): Active SLURM client connection.
workflow_params: Dictionary of workflow-specific parameters.
workflow_params: Dictionary of regular workflow parameters.
file_params: Dictionary of file-attachment params (annotation IDs).
client: OMERO script client for parameter access.
conn: OMERO BlitzGateway connection (needed for file transfers).
UI_messages (str): Accumulated user interface messages.
zipfile: Name of input data file on SLURM.
zipfile: Name of input data file on SLURM (determines data_path).
email: User email for job notifications.
name: Workflow name to execute.
wf_id: Workflow UUID for tracking.
Expand All @@ -784,13 +809,86 @@ def run_workflow(slurmClient: SlurmClient,
logger.info(f"Submitting workflow: {name}")
workflow_version = unwrap(client.getInput(f"{name}_Version"))

# Extract workflow parameters
# Extract regular workflow parameters
kwargs = {}
for k in workflow_params:
kwargs[k] = unwrap(client.getInput(f"{name}_|_{k}"))

logger.debug(f"Workflow parameters: {kwargs}")
# Transfer file-attachment inputs to HPC and resolve their Slurm paths
if file_params:
logger.info('''
# ------------------------------------------------
# :: 1b. Transfer file attachments to Slurm ::
# ------------------------------------------------
''')
svc = conn.getScriptService()
scripts = svc.getScripts()
file_transfer_scripts = [constants.FILE_TRANSFER_SCRIPT]
ft_matches = [(unwrap(s.id), unwrap(s.getName()))
for s in scripts if unwrap(s.getName()) in file_transfer_scripts]
if not ft_matches:
logger.warning(
f"File transfer script {file_transfer_scripts} not found — "
f"skipping file-attachment transfer for {name}."
)
else:
ft_script_id, ft_script_name = ft_matches[0]
for param_id, fp in file_params.items():
ann_id = unwrap(client.getInput(f"{name}_|_FILE_{param_id}"))
if ann_id is None:
if not fp['optional']:
err = f"Required file attachment '{param_id}' has no annotation ID — cannot run workflow."
logger.error(err)
raise ValueError(err)
logger.debug(f"No annotation ID supplied for optional param '{param_id}', skipping.")
continue
logger.info(f"Transferring file attachment {ann_id} for param '{param_id}'")
ft_inputs = {
constants.file_transfer.FILE_ANNOTATION_ID: rlong(ann_id),
constants.file_transfer.FOLDER: rstring(zipfile),
constants.CLEANUP: client.getInput(constants.CLEANUP) or rbool(True),
}
try:
ft_task_id = slurmClient.workflowTracker.add_task_to_workflow(
wf_id, ft_script_name, VERSION,
ann_id, {k: unwrap(v) for k, v in ft_inputs.items()}
)
slurmClient.workflowTracker.start_task(ft_task_id)
except Exception as db_e:
logger.error(f"DB error adding file-transfer task: {db_e}")
raise
ft_rv, _ = runOMEROScript(client, svc, ft_script_id, ft_inputs,
slurmClient=slurmClient)
slurm_path = unwrap(ft_rv.get('Slurm_Path')) if ft_rv else None
ft_msg = unwrap(ft_rv.get('Message', None)) if ft_rv else ''
if slurm_path:
kwargs[param_id] = slurm_path
UI_messages += f"Transferred {fp['type']} '{param_id}' to {slurm_path}. "
slurmClient.workflowTracker.complete_task(ft_task_id, ft_msg or slurm_path)
else:
err = f"File transfer for '{param_id}' (ann {ann_id}) returned no path: {ft_msg}"
logger.warning(err)
slurmClient.workflowTracker.fail_task(ft_task_id, err)
if not fp.get('optional', True):
raise ValueError(err)

logger.debug(f"Workflow parameters (incl. file paths): {kwargs}")
try:
# Pre-flight: verify the job script exists on Slurm before submitting
configured_job = slurmClient.slurm_model_jobs.get(name.lower())
job_script = (
f"{slurmClient.slurm_script_path}/{configured_job}"
if configured_job
else f"{slurmClient.slurm_script_path}/jobs/{name}.sh"
)
check_result = slurmClient.run(f'test -f "{job_script}"', warn=True)
if check_result.exited != 0:
raise FileNotFoundError(
f"Job script not found on Slurm: {job_script}. "
f"Please generate the job script for '{name}' first "
f"(use 'Validate Slurm Setup' or check your slurm-scripts repo)."
)

cp_result, slurm_job_id, wf_id, task_id = slurmClient.run_workflow(
workflow_name=name,
workflow_version=workflow_version,
Expand Down
36 changes: 27 additions & 9 deletions __workflows/SLURM_Run_Workflow_Batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
constants.workflow.OUTPUT_CSV_TABLE]

# Version constant for easy version management
VERSION = "2.6.0"
VERSION = "2.7.0"


def runScript():
Expand Down Expand Up @@ -213,6 +213,7 @@ def runScript():
(wf_versions, _) = slurmClient.get_all_image_versions_and_data_files()
na = ["Not Available!"]
_workflow_params = {}
_workflow_file_params = {}
_workflow_available_versions = {}
# All currently configured workflows
workflows = wf_versions.keys()
Expand All @@ -226,16 +227,17 @@ def runScript():
_workflow_available_versions[wf] = wf_versions.get(
wf, na)
# Get the workflow parameters (dynamically) from their repository
_workflow_params[wf] = slurmClient.get_workflow_parameters(
wf)
_all_wf_params = slurmClient.get_workflow_parameters(wf)
_workflow_params[wf] = {k: v for k, v in _all_wf_params.items() if not v['file_attachment']}
_workflow_file_params[wf] = {k: v for k, v in _all_wf_params.items() if v['file_attachment']}
# Main parameter to select this workflow for execution
json_descriptor = slurmClient.pull_descriptor_from_github(wf)
wf_descr = json_descriptor['description']
descriptor = slurmClient.generic_descriptor_from_github(wf)
wf_descr = descriptor['description']
# Build value-choices lookup from the descriptor (scoped per wf,
# so param name collisions across workflows are not an issue)
value_choices_map = {
inp['id']: [rstring(v) for v in inp['value-choices']]
for inp in json_descriptor.get('inputs', [])
for inp in descriptor.get('inputs', [])
if inp.get('value-choices')
}
wf_ = omscripts.Bool(wf, grouping=parameter_group, default=False,
Expand All @@ -251,9 +253,9 @@ def runScript():
# Create a script parameter for all workflow parameters
for param_incr, (k, param) in enumerate(_workflow_params[
wf].items()):
# Convert the parameter from cy(tomine)type to om(ero)type
omtype_param = slurmClient.convert_cytype_to_omtype(
param["cytype"],
# Convert the parameter type to om(ero)type
omtype_param = slurmClient.convert_param_type_to_omtype(
param["type"],
param["default"],
param["name"],
description=param["description"],
Expand All @@ -267,6 +269,22 @@ def runScript():
# them to BIOMERO (as the wf will not understand these params)
omtype_param._name = f"{wf}_|_{omtype_param._name}"
input_list.append(omtype_param)
# File-attachment params: exposed as Long (OMERO FileAnnotation ID)
# They live under a FILE_ prefix so SLURM_Run_Workflow can tell them apart.
num_reg = len(_workflow_params[wf])
for fp_incr, (k, fp) in enumerate(_workflow_file_params[wf].items()):
fmt_str = ", ".join(fp['format']) if fp['format'] else "any"
fp_param = omscripts.Long(
f"{wf}_|_FILE_{k}",
optional=fp['optional'],
grouping=f"{parameter_group}.{num_reg + fp_incr + 1}",
description=(
f"[{fp['type'].capitalize()} attachment] {fp['description']}"
f" Accepted formats: {fmt_str}."
f" Provide the OMERO FileAnnotation ID."
),
)
input_list.append(fp_param)
# Finish setting up the Omero script UI
inputs = {
p._name: p for p in input_list
Expand Down
2 changes: 1 addition & 1 deletion _data/SLURM_Get_Results.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
from omero_metadata.populate import ParsingContext

# Version constant for easy version management
VERSION = "2.6.0"
VERSION = "2.7.0"

OBJECT_TYPES = (
'Plate',
Expand Down
2 changes: 1 addition & 1 deletion _data/SLURM_Get_Update.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import sys

# Version constant for easy version management
VERSION = "2.6.0"
VERSION = "2.7.0"

logger = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion _data/SLURM_Import_Results.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def _mask_url(url): return url # Fallback function if import fails
"IMPORTER_ENABLED is false - dataset imports will not be supported")

# Version constant for easy version management
VERSION = "2.6.0"
VERSION = "2.7.0"

OBJECT_TYPES = (
'Plate',
Expand Down
2 changes: 1 addition & 1 deletion _data/SLURM_Remote_Conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
CONV_OPTIONS_TARGET = ['tiff', 'zarr']

# Version constant for easy version management
VERSION = "2.6.0"
VERSION = "2.7.0"


def runScript():
Expand Down
Loading