Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 51 additions & 19 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
cast,
Optional,
TYPE_CHECKING,
TypeVar,
Union,
)

Expand Down Expand Up @@ -118,7 +119,11 @@
from galaxy.work.context import WorkRequestContext

if TYPE_CHECKING:
from sqlalchemy.sql.expression import Select
from sqlalchemy.sql.expression import (
ColumnElement,
Label,
Select,
)

log = logging.getLogger(__name__)

Expand All @@ -134,7 +139,7 @@ class JobLock(BaseModel):
active: bool = Field(title="Job lock status", description="If active, jobs will not dispatch")


def get_path_key(path_tuple):
def get_path_key(path_tuple: tuple):
path_key = ""
tuple_elements = len(path_tuple)
for i, p in enumerate(path_tuple):
Expand All @@ -154,12 +159,15 @@ def get_path_key(path_tuple):


def safe_label_or_none(label: str) -> Optional[str]:
if label and len(label) > 63:
if len(label) > 63:
return None
return label


def safe_aliased(model_class, name=None):
T = TypeVar("T")


def safe_aliased(model_class: type[T], name: str) -> type[T]:
"""Create an aliased model class with a unique name."""
return aliased(model_class, name=safe_label_or_none(name))

Expand Down Expand Up @@ -447,11 +455,11 @@ def by_tool_input(
job_state: Optional[JobStatesT] = (Job.states.OK,),
history_id: Union[int, None] = None,
require_name_match: bool = True,
):
) -> Union[Job, None]:
"""Search for jobs producing same results using the 'inputs' part of a tool POST."""
input_data = defaultdict(list)
input_data: dict[Any, list[dict[str, Any]]] = defaultdict(list)

def populate_input_data_input_id(path, key, value):
def populate_input_data_input_id(path: tuple, key, value) -> tuple[Any, Any]:
"""Traverses expanded incoming using remap and collects input_ids and input_data."""
if key == "id":
path_key = get_path_key(path[:-2])
Expand Down Expand Up @@ -499,13 +507,13 @@ def __search(
tool_id: str,
tool_version: Optional[str],
user: model.User,
input_data,
input_data: dict[Any, list[dict[str, Any]]],
job_state: Optional[JobStatesT],
param_dump: ToolStateDumpedToJsonInternalT,
wildcard_param_dump=None,
history_id: Union[int, None] = None,
require_name_match: bool = True,
):
) -> Union[Job, None]:
search_timer = ExecutionTimer()

def replace_dataset_ids(path, key, value):
Expand All @@ -525,15 +533,15 @@ def replace_dataset_ids(path, key, value):

stmt = select(model.Job.id.label("job_id"))

data_conditions: list = []
data_conditions: list[ColumnElement[bool]] = []

# We now build the stmt filters that relate to the input datasets
# that this job uses. We keep track of the requested dataset id in `requested_ids`,
# the type (hda, hdca or lda) in `data_types`
# and the ids that have been used in the job that has already been run in `used_ids`.
requested_ids = []
data_types = []
used_ids: list = []
used_ids: list[Label[int]] = []
for k, input_list in input_data.items():
# k will be matched against the JobParameter.name column. This can be prefixed depending on whether
# the input is in a repeat, or not (section and conditional)
Expand Down Expand Up @@ -722,7 +730,7 @@ def _filter_jobs(

return stmt

def _exclude_jobs_with_deleted_outputs(self, stmt):
def _exclude_jobs_with_deleted_outputs(self, stmt: "Select[tuple[int]]") -> "Select":
subquery_alias = stmt.subquery("filtered_jobs_subquery")
outer_select_columns = [subquery_alias.c[col.name] for col in stmt.selected_columns]
outer_stmt = select(*outer_select_columns).select_from(subquery_alias)
Expand Down Expand Up @@ -767,14 +775,14 @@ def _exclude_jobs_with_deleted_outputs(self, stmt):
def _build_stmt_for_hda(
self,
stmt: "Select[tuple[int]]",
data_conditions: list,
used_ids: list,
data_conditions: list["ColumnElement[bool]"],
used_ids: list["Label[int]"],
k,
v,
identifier,
value_index: int,
require_name_match: bool = True,
):
) -> "Select[tuple[int]]":
a = aliased(model.JobToInputDatasetAssociation)
b = aliased(model.HistoryDatasetAssociation)
c = aliased(model.HistoryDatasetAssociation)
Expand Down Expand Up @@ -831,7 +839,15 @@ def _build_stmt_for_hda(
)
return stmt

def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v, value_index):
def _build_stmt_for_ldda(
self,
stmt: "Select[tuple[int]]",
data_conditions: list["ColumnElement[bool]"],
used_ids: list["Label[int]"],
k,
v,
value_index: int,
) -> "Select[tuple[int]]":
a = aliased(model.JobToInputLibraryDatasetAssociation)
label = safe_label_or_none(f"{k}_{value_index}")
labeled_col = a.ldda_id.label(label)
Expand All @@ -848,8 +864,15 @@ def agg_expression(self, column):
return func.array_agg(column, order_by=column)

def _build_stmt_for_hdca(
self, stmt, data_conditions, used_ids, k, v, user_id, value_index, require_name_match=True
):
self,
stmt: "Select[tuple[int]]",
data_conditions: list["ColumnElement[bool]"],
used_ids: list["Label[int]"],
k,
v,
user_id: int,
value_index: int,
) -> "Select[tuple[int]]":
# Strategy for efficiently finding equivalent HDCAs:
# 1. Determine the structural depth of the target HDCA by its collection_type.
# 2. For the target HDCA (identified by 'v'):
Expand Down Expand Up @@ -1088,7 +1111,16 @@ def _build_stmt_for_hdca(
data_conditions.append(a.name == k)
return stmt

def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id, value_index):
def _build_stmt_for_dce(
self,
stmt: "Select[tuple[int]]",
data_conditions: list["ColumnElement[bool]"],
used_ids: list["Label[int]"],
k,
v,
user_id: int,
value_index: int,
) -> "Select[tuple[int]]":
dce_root_target = self.sa_session.get_one(model.DatasetCollectionElement, v)

# Determine if the target DCE points to an HDA or a child collection
Expand Down
Loading
Loading