diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 40b706cbd5cd..3ada69cffdf9 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -2242,14 +2242,28 @@ def fail(message=job.info, exception=None): if self.app.config.enable_celery_tasks: from galaxy.celery.tasks import compute_dataset_hash - extra_files_path = dataset.extra_files_path if dataset.extra_files_path_exists() else None request = ComputeDatasetHashTaskRequest( dataset_id=dataset.id, - extra_files_path=extra_files_path, + extra_files_path=None, hash_function=self.app.config.hash_function, ) compute_dataset_hash.delay(request=request) + # For composite datasets with extra files, hash each extra file individually + if dataset.extra_files_path_exists(): + for root, _, files in os.walk(dataset.extra_files_path): + for file in files: + file_path = os.path.join(root, file) + if os.path.exists(file_path): + # Calculate relative path from extra_files_path + relative_path = os.path.relpath(file_path, dataset.extra_files_path) + request = ComputeDatasetHashTaskRequest( + dataset_id=dataset.id, + extra_files_path=relative_path, + hash_function=self.app.config.hash_function, + ) + compute_dataset_hash.delay(request=request) + user = job.user if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use: user.adjust_total_disk_usage(collected_bytes, quota_source_info.label) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 61e723554411..1f8a8051162b 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -172,6 +172,68 @@ def safe_aliased(model_class: type[T], name: str) -> type[T]: return aliased(model_class, name=safe_label_or_none(name)) +def has_same_hash( + stmt: "Select[tuple[int]]", a: type[model.HistoryDatasetAssociation], b: type[model.HistoryDatasetAssociation] +) -> "Select[tuple[int]]": + a_hash = aliased(model.DatasetHash) + b_hash = aliased(model.DatasetHash) + b_hash_total = aliased(model.DatasetHash) + + # Join b directly, checking for either direct dataset match or hash match + # The hash match uses a correlated subquery to avoid the expensive cartesian product + stmt = stmt.join( + b, + or_( + # Direct dataset match + b.dataset_id == a.dataset_id, + # Hash match: b's dataset has hashes that match all of a's hashes + # For composite datasets, this means matching the primary file hash AND all extra file hashes + # For regular datasets, this means matching the single primary file hash + b.dataset_id.in_( + select(b_hash.dataset_id) + .select_from(a_hash) + .join( + b_hash, + and_( + a_hash.hash_function == b_hash.hash_function, + a_hash.hash_value == b_hash.hash_value, + # Match extra_files_path: both NULL or both the same path + or_( + and_( + a_hash.extra_files_path.is_(None), + b_hash.extra_files_path.is_(None), + ), + a_hash.extra_files_path == b_hash.extra_files_path, + ), + ), + ) + .where(a_hash.dataset_id == a.dataset_id) + # Group by b's dataset_id and ensure all of a's hashes are matched + .group_by(b_hash.dataset_id) + .having( + and_( + # Number of matched hashes equals total hashes in A + func.count(b_hash.id) + == select(func.count(model.DatasetHash.id)) + .where(model.DatasetHash.dataset_id == a.dataset_id) + .correlate(a) + .scalar_subquery(), + # Total hashes in B equals total hashes in A (ensures no extra hashes in B) + select(func.count(b_hash_total.id)) + .where(b_hash_total.dataset_id == b_hash.dataset_id) + .scalar_subquery() + == select(func.count(model.DatasetHash.id)) + .where(model.DatasetHash.dataset_id == a.dataset_id) + .correlate(a) + .scalar_subquery(), + ) + ) + ), + ), + ) + return stmt + + class JobManager: def __init__(self, app: StructuredApp): self.app = app @@ -793,7 +855,8 @@ def _build_stmt_for_hda( used_ids.append(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) # b is the HDA used for the job - stmt = stmt.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id) + stmt = stmt.join(b, a.dataset_id == b.id) + stmt = has_same_hash(stmt, b, c) name_condition = [] hda_history_join_conditions = [ e.history_dataset_association_id == b.id, diff --git a/lib/galaxy_test/api/test_datasets.py b/lib/galaxy_test/api/test_datasets.py index a97cfe5885b9..c48b42be1fd7 100644 --- a/lib/galaxy_test/api/test_datasets.py +++ b/lib/galaxy_test/api/test_datasets.py @@ -789,10 +789,12 @@ def test_compute_md5_on_primary_dataset(self, history_id): hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5") - def test_compute_sha1_on_composite_dataset(self, history_id): + def test_compute_sha256_on_composite_dataset_by_default(self, history_id): output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True) - self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Roadmaps") hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) + self.assert_hash_value( + hda_details, "94e09ae129f1ec32d1736af833160e8bdaa3a75cef2982712076c7bcd7d155d3", "SHA-256" + ) self.assert_hash_value( hda_details, "3cbd311889963528954fe03b28b68a09685ea7a75660bd2268d5b44cafbe0d22", diff --git a/lib/galaxy_test/api/test_tool_execute.py b/lib/galaxy_test/api/test_tool_execute.py index 59018a376680..c16efa74060e 100644 --- a/lib/galaxy_test/api/test_tool_execute.py +++ b/lib/galaxy_test/api/test_tool_execute.py @@ -7,7 +7,9 @@ files, etc..). """ +import copy from dataclasses import dataclass +from typing import Any import pytest @@ -599,6 +601,63 @@ def test_map_over_data_param_with_list_of_lists(target_history: TargetHistory, r execute.assert_creates_implicit_collection(0) +@requires_tool_id("gx_data") +def test_job_cache_with_dataset_hash(target_history: TargetHistory, required_tool: RequiredTool): + hda = target_history.with_dataset("1\t2\t3", "dataset1") + execute = required_tool.execute().with_inputs({"parameter": hda.src_dict}) + execute._assert_executed_ok() + new_hda = target_history.with_dataset("1\t2\t3", "dataset1") + execute = required_tool.execute(use_cached_job=True).with_inputs({"parameter": new_hda.src_dict}) + execution = execute.assert_has_single_job + assert execution.final_details["copied_from_job_id"] + + +@requires_tool_id("gx_data") +def test_job_cache_with_extra_files(target_history: TargetHistory, required_tool: RequiredTool) -> None: + # Upload a composite dataset (velvet) which creates extra files + velvet_upload_request: dict[str, Any] = { + "src": "composite", + "ext": "velvet", + "composite": { + "items": [ + {"src": "pasted", "paste_content": "sequences content"}, + {"src": "pasted", "paste_content": "roadmaps content"}, + {"src": "pasted", "paste_content": "log content"}, + ] + }, + } + + # Upload first velvet dataset - access the private _dataset_populator + velvet1_hda = target_history._dataset_populator.fetch_hda(target_history.id, velvet_upload_request, wait=True) + velvet1 = {"src": "hda", "id": velvet1_hda["id"]} + + # Run gx_data tool on the first velvet dataset + _ = required_tool.execute().with_inputs({"parameter": velvet1}).assert_has_single_job + + # Upload the same velvet dataset a second time + velvet2_hda = target_history._dataset_populator.fetch_hda(target_history.id, velvet_upload_request, wait=True) + velvet2 = {"src": "hda", "id": velvet2_hda["id"]} + + # Run gx_data on the second velvet dataset with job cache enabled + job = required_tool.execute(use_cached_job=True).with_inputs({"parameter": velvet2}).assert_has_single_job + + # Job cache should be used when all hashes match + assert job.final_details["copied_from_job_id"] + + # Upload a third velvet dataset with modified content in one of the extra files + velvet_modified_request = copy.deepcopy(velvet_upload_request) + velvet_modified_request["composite"]["items"][1]["paste_content"] = "roadmaps content MODIFIED" + + velvet3_hda = target_history._dataset_populator.fetch_hda(target_history.id, velvet_modified_request, wait=True) + velvet3 = {"src": "hda", "id": velvet3_hda["id"]} + + # Run gx_data on the third velvet dataset with job cache enabled + job3 = required_tool.execute(use_cached_job=True).with_inputs({"parameter": velvet3}).assert_has_single_job + + # Job cache should NOT be used when hashes don't match completely + assert not job3.final_details["copied_from_job_id"] + + @requires_tool_id("gx_repeat_boolean_min") def test_optional_repeats_with_mins_filled_id(target_history: TargetHistory, required_tool: RequiredTool): # we have a tool test for this but I wanted to verify it wasn't just the @@ -723,14 +782,36 @@ def test_null_to_text_tool_with_validation(required_tool: RequiredTool, tool_inp required_tool.execute().with_inputs(tool_input_format.when.any({"parameter": ""})).assert_fails() -@requires_tool_id("cat|cat1") -def test_deferred_basic(required_tool: RequiredTool, target_history: TargetHistory): - has_src_dict = target_history.with_deferred_dataset_for_test_file("1.bed", ext="bed") +def _run_deferred( + required_tool: RequiredTool, + target_history: TargetHistory, + use_cached_job: bool = False, + expect_cached_job: bool = False, + include_correct_hash: bool = True, +) -> None: + has_src_dict = target_history.with_deferred_dataset_for_test_file( + "1.bed", ext="bed", include_correct_hash=include_correct_hash + ) inputs = { "input1": has_src_dict.src_dict, } - output = required_tool.execute().with_inputs(inputs).assert_has_single_job.with_single_output + job = required_tool.execute(use_cached_job=use_cached_job).with_inputs(inputs).assert_has_single_job + output = job.with_single_output output.assert_contains("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -") + if use_cached_job: + assert bool(job.final_details["copied_from_job_id"]) == expect_cached_job + + +@requires_tool_id("cat|cat1") +def test_deferred_with_cached_input(required_tool: RequiredTool, target_history: TargetHistory) -> None: + # Basic deferred dataset + _run_deferred(required_tool, target_history) + # Should just work because input is deferred + _run_deferred(required_tool, target_history, use_cached_job=True, expect_cached_job=True) + # Should fail to use the cached job because we don't have a source hash for deduplicating materialized dataset + _run_deferred( + required_tool, target_history, use_cached_job=True, expect_cached_job=False, include_correct_hash=False + ) @requires_tool_id("metadata_bam") diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index fe315426c2fa..ff890ac7413d 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -605,14 +605,24 @@ def fetch_hda(self, history_id: str, item: dict[str, Any], wait: bool = True) -> assert len(hdas) == 1 return hdas[0] - def create_deferred_hda(self, history_id: str, uri: str, ext: Optional[str] = None) -> dict[str, Any]: - item = { + def create_deferred_hda( + self, + history_id: str, + uri: str, + name: Union[str, None] = None, + ext: Optional[str] = None, + hashes: Union[list[dict[str, str]], None] = None, + ) -> dict[str, Any]: + item: dict[str, Any] = { + "name": name, "src": "url", "url": uri, "deferred": True, } if ext: item["ext"] = ext + if hashes: + item["hashes"] = hashes output = self.fetch_hda(history_id, item) details = self.get_history_dataset_details(history_id, dataset=output) return details @@ -620,15 +630,7 @@ def create_deferred_hda(self, history_id: str, uri: str, ext: Optional[str] = No def create_deferred_hda_with_hash(self, history_id: str, content: str) -> dict[str, Any]: url = f"base64://{base64.b64encode(content.encode()).decode()}" hashes = [{"hash_function": "SHA-1", "hash_value": hashlib.sha1(content.encode()).hexdigest()}] - item = { - "ext": "txt", - "src": "url", - "url": url, - "hashes": hashes, - "deferred": True, - } - output = self.fetch_hda(history_id, item) - details = self.get_history_dataset_details(history_id, dataset=output) + details = self.create_deferred_hda(history_id, url, ext="txt", hashes=hashes) return details def export_dataset_to_remote_file(self, history_id: str, content: str, name: str, target_uri: str): @@ -1833,9 +1835,12 @@ def wait_on_download_request(self, storage_request_id: UUID) -> Response: def base64_url_for_string(self, content: str) -> str: return self.base64_url_for_bytes(content.encode("utf-8")) - def base64_url_for_test_file(self, test_filename: str) -> str: + def contents_for_test_file(self, test_filename: str) -> bytes: test_data_resolver = TestDataResolver() - file_contents = open(test_data_resolver.get_filename(test_filename), "rb").read() + return open(test_data_resolver.get_filename(test_filename), "rb").read() + + def base64_url_for_test_file(self, test_filename: str) -> str: + file_contents = self.contents_for_test_file(test_filename) return self.base64_url_for_bytes(file_contents) def base64_url_for_bytes(self, content: bytes) -> str: @@ -4080,7 +4085,7 @@ def __init__(self, dataset_populator: BaseDatasetPopulator, history_id: str, job def _wait_for(self): if self._final_details is None: self._dataset_populator.wait_for_job(self._job_id, assert_ok=False) - self._final_details = self._dataset_populator.get_job_details(self._job_id).json() + self._final_details = self._dataset_populator.get_job_details(self._job_id, full=True).json() @property def final_details(self) -> dict[str, Any]: @@ -4489,27 +4494,34 @@ def with_dataset( def with_deferred_dataset( self, uri: str, - named: Optional[str] = None, + name: Optional[str] = None, ext: Optional[str] = None, + hashes: Union[list[dict[str, str]], None] = None, ) -> "HasSrcDict": - kwd = {} - if named is not None: - kwd["name"] = named new_dataset = self._dataset_populator.create_deferred_hda( history_id=self._history_id, uri=uri, + name=name, ext=ext, + hashes=hashes, ) return HasSrcDict("hda", new_dataset) def with_deferred_dataset_for_test_file( self, filename: str, - named: Optional[str] = None, + name: Optional[str] = None, ext: Optional[str] = None, + include_correct_hash: bool = True, ) -> "HasSrcDict": - base64_url = self._dataset_populator.base64_url_for_test_file(filename) - return self.with_deferred_dataset(base64_url, named=named, ext=ext) + file_contents = self._dataset_populator.contents_for_test_file(filename) + base64_url = self._dataset_populator.base64_url_for_bytes(file_contents) + hashes = ( + [{"hash_function": "SHA-1", "hash_value": hashlib.sha1(file_contents).hexdigest()}] + if include_correct_hash + else None + ) + return self.with_deferred_dataset(base64_url, name=name, ext=ext, hashes=hashes) def with_unpaired(self) -> "HasSrcDict": return self._fetch_response(