Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions bioblend/galaxy/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ def invoke_workflow(
require_exact_tool_versions: bool = True,
version: Optional[int] = None,
use_cached_job: bool = False,
parameters: Optional[dict] = None,
instance: bool = False,
resource_params: Optional[dict[str, Any]] = None,
preferred_object_store_id: Optional[str] = None,
preferred_intermediate_object_store_id: Optional[str] = None,
preferred_outputs_object_store_id: Optional[str] = None,
) -> dict[str, Any]:
"""
Invoke the workflow identified by ``workflow_id``. This will
Expand Down Expand Up @@ -351,6 +357,7 @@ def invoke_workflow(

:type params: dict
:param params: A mapping of non-datasets tool parameters (see below)
A synonym to the "parameters" dict below. Both cannot be provided.

:type history_id: str
:param history_id: The encoded history ID where to store the workflow
Expand Down Expand Up @@ -401,6 +408,33 @@ def invoke_workflow(
:param use_cached_job: Whether to use cached jobs for the workflow
invocation.

:type parameters: dict
:param parameters: A mapping of non-datasets tool parameters (see below)
A synonym to the "params" dict above. Both cannot be provided.

:type instance: bool
:param instance: treat ``workflow_id`` as a Workflow ID if True,
otherwise treat it as a StoredWorkflow ID (the default). This
parameter works only on Galaxy 21.05 or later.

:type resource_params: dict
:param resource_params: A dictionary containing the resource parameters
to be used for this workflow run.

:type preferred_object_store_id: str
:param preferred_object_store_id: The object store id where you want all
outputs of this workflow run be stored.

:type preferred_intermediate_object_store_id: str
:param preferred_intermediate_object_store_id: The object store id where
you want the intermediate outputs of this workflow run to be stored.
Cannot be set if ``preferred_object_store_id`` is set.

:type preferred_outputs_object_store_id: str
:param preferred_outputs_object_store_id: The object store id where
you want the priamry outputs of this workflow run to be stored.
Cannot be set if ``preferred_object_store_id`` is set.

:rtype: dict
:return: A dict containing the workflow invocation describing the
scheduling of the workflow. For example::
Expand Down Expand Up @@ -514,16 +548,35 @@ def invoke_workflow(
"require_exact_tool_versions": require_exact_tool_versions,
"version": version,
"use_cached_job": use_cached_job,
"instance": instance,
}
if inputs:
payload["inputs"] = inputs

if params:
if params and parameters:
raise ValueError("You may specify either 'parameters' or 'params' but not both.")
elif parameters:
payload["parameters"] = parameters
elif params:
payload["parameters"] = params

split_object_store_config = (
preferred_outputs_object_store_id is not None or preferred_intermediate_object_store_id is not None
)
if split_object_store_config and preferred_object_store_id:
raise ValueError(
"You may specify either 'preferred_object_store_id' or one/both of 'preferred_outputs_object_store_id' and 'preferred_intermediate_object_store_id' but not both"
)

if preferred_object_store_id:
payload["preferred_object_store_id"] = preferred_object_store_id
if preferred_intermediate_object_store_id:
payload["preferred_intermediate_object_store_id"] = preferred_intermediate_object_store_id
if preferred_outputs_object_store_id:
payload["preferred_outputs_object_store_id"] = preferred_outputs_object_store_id

if inputs:
payload["inputs"] = inputs
if replacement_params:
payload["replacement_params"] = replacement_params

if history_id:
payload["history"] = f"hist_id={history_id}"
elif history_name:
Expand All @@ -534,6 +587,9 @@ def invoke_workflow(
payload["inputs_by"] = inputs_by
if parameters_normalized:
payload["parameters_normalized"] = parameters_normalized
if resource_params:
payload["resource_params"] = resource_params

url = self._invocations_url(workflow_id)
return self._post(payload, url=url)

Expand Down