Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2242,14 +2242,28 @@ def fail(message=job.info, exception=None):
if self.app.config.enable_celery_tasks:
from galaxy.celery.tasks import compute_dataset_hash

extra_files_path = dataset.extra_files_path if dataset.extra_files_path_exists() else None
request = ComputeDatasetHashTaskRequest(
dataset_id=dataset.id,
extra_files_path=extra_files_path,
extra_files_path=None,
hash_function=self.app.config.hash_function,
)
compute_dataset_hash.delay(request=request)

# For composite datasets with extra files, hash each extra file individually
if dataset.extra_files_path_exists():
Copy link
Member Author

@mvdbeek mvdbeek Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can be thousands of files, can we at least smash this into a single task ? And maybe record a hash of hashes so the query won't have thousands of comparisons ?

for root, _, files in os.walk(dataset.extra_files_path):
for file in files:
file_path = os.path.join(root, file)
if os.path.exists(file_path):
# Calculate relative path from extra_files_path
relative_path = os.path.relpath(file_path, dataset.extra_files_path)
request = ComputeDatasetHashTaskRequest(
dataset_id=dataset.id,
extra_files_path=relative_path,
hash_function=self.app.config.hash_function,
)
compute_dataset_hash.delay(request=request)

user = job.user
if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use:
user.adjust_total_disk_usage(collected_bytes, quota_source_info.label)
Expand Down
65 changes: 64 additions & 1 deletion lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,68 @@ def safe_aliased(model_class: type[T], name: str) -> type[T]:
return aliased(model_class, name=safe_label_or_none(name))


def has_same_hash(
stmt: "Select[tuple[int]]", a: type[model.HistoryDatasetAssociation], b: type[model.HistoryDatasetAssociation]
) -> "Select[tuple[int]]":
a_hash = aliased(model.DatasetHash)
b_hash = aliased(model.DatasetHash)
b_hash_total = aliased(model.DatasetHash)

# Join b directly, checking for either direct dataset match or hash match
# The hash match uses a correlated subquery to avoid the expensive cartesian product
stmt = stmt.join(
b,
or_(
# Direct dataset match
b.dataset_id == a.dataset_id,
# Hash match: b's dataset has hashes that match all of a's hashes
# For composite datasets, this means matching the primary file hash AND all extra file hashes
# For regular datasets, this means matching the single primary file hash
b.dataset_id.in_(
select(b_hash.dataset_id)
.select_from(a_hash)
.join(
b_hash,
and_(
a_hash.hash_function == b_hash.hash_function,
a_hash.hash_value == b_hash.hash_value,
# Match extra_files_path: both NULL or both the same path
or_(
and_(
a_hash.extra_files_path.is_(None),
b_hash.extra_files_path.is_(None),
),
a_hash.extra_files_path == b_hash.extra_files_path,
),
),
)
.where(a_hash.dataset_id == a.dataset_id)
# Group by b's dataset_id and ensure all of a's hashes are matched
.group_by(b_hash.dataset_id)
.having(
and_(
# Number of matched hashes equals total hashes in A
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am so worried about this bringing down the database ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we calculate and store in the database (also) a "combined_hash", i.e. a hash of all the hashes of the dataset primary and extra files? We could use a similar approach also for a dataset collection (whose hash would be the combined hash of the combined hashes of its components).
Then we would be able to simplify the database query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can extra files change when you change the dataytype ? I think they can ? I don't know if it's wise to include the extra file stuff in a first pass, it is complex and i'm not sure that even just including the hashes isn't a large query penalty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the primary file that change for some datatypes (e.g. Rgenetics) when setting metadata, not sure about the extra files.

func.count(b_hash.id)
== select(func.count(model.DatasetHash.id))
.where(model.DatasetHash.dataset_id == a.dataset_id)
.correlate(a)
.scalar_subquery(),
# Total hashes in B equals total hashes in A (ensures no extra hashes in B)
select(func.count(b_hash_total.id))
.where(b_hash_total.dataset_id == b_hash.dataset_id)
.scalar_subquery()
== select(func.count(model.DatasetHash.id))
.where(model.DatasetHash.dataset_id == a.dataset_id)
.correlate(a)
.scalar_subquery(),
)
)
),
),
)
return stmt


class JobManager:
def __init__(self, app: StructuredApp):
self.app = app
Expand Down Expand Up @@ -793,7 +855,8 @@ def _build_stmt_for_hda(
used_ids.append(labeled_col)
stmt = stmt.join(a, a.job_id == model.Job.id)
# b is the HDA used for the job
stmt = stmt.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id)
stmt = stmt.join(b, a.dataset_id == b.id)
stmt = has_same_hash(stmt, b, c)
name_condition = []
hda_history_join_conditions = [
e.history_dataset_association_id == b.id,
Expand Down
6 changes: 4 additions & 2 deletions lib/galaxy_test/api/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,10 +789,12 @@ def test_compute_md5_on_primary_dataset(self, history_id):
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5")

def test_compute_sha1_on_composite_dataset(self, history_id):
def test_compute_sha256_on_composite_dataset_by_default(self, history_id):
output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True)
self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Roadmaps")
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
self.assert_hash_value(
hda_details, "94e09ae129f1ec32d1736af833160e8bdaa3a75cef2982712076c7bcd7d155d3", "SHA-256"
)
self.assert_hash_value(
hda_details,
"3cbd311889963528954fe03b28b68a09685ea7a75660bd2268d5b44cafbe0d22",
Expand Down
89 changes: 85 additions & 4 deletions lib/galaxy_test/api/test_tool_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
files, etc..).
"""

import copy
from dataclasses import dataclass
from typing import Any

import pytest

Expand Down Expand Up @@ -599,6 +601,63 @@ def test_map_over_data_param_with_list_of_lists(target_history: TargetHistory, r
execute.assert_creates_implicit_collection(0)


@requires_tool_id("gx_data")
def test_job_cache_with_dataset_hash(target_history: TargetHistory, required_tool: RequiredTool):
hda = target_history.with_dataset("1\t2\t3", "dataset1")
execute = required_tool.execute().with_inputs({"parameter": hda.src_dict})
execute._assert_executed_ok()
new_hda = target_history.with_dataset("1\t2\t3", "dataset1")
execute = required_tool.execute(use_cached_job=True).with_inputs({"parameter": new_hda.src_dict})
execution = execute.assert_has_single_job
assert execution.final_details["copied_from_job_id"]


@requires_tool_id("gx_data")
def test_job_cache_with_extra_files(target_history: TargetHistory, required_tool: RequiredTool) -> None:
# Upload a composite dataset (velvet) which creates extra files
velvet_upload_request: dict[str, Any] = {
"src": "composite",
"ext": "velvet",
"composite": {
"items": [
{"src": "pasted", "paste_content": "sequences content"},
{"src": "pasted", "paste_content": "roadmaps content"},
{"src": "pasted", "paste_content": "log content"},
]
},
}

# Upload first velvet dataset - access the private _dataset_populator
velvet1_hda = target_history._dataset_populator.fetch_hda(target_history.id, velvet_upload_request, wait=True)
velvet1 = {"src": "hda", "id": velvet1_hda["id"]}

# Run gx_data tool on the first velvet dataset
_ = required_tool.execute().with_inputs({"parameter": velvet1}).assert_has_single_job

# Upload the same velvet dataset a second time
velvet2_hda = target_history._dataset_populator.fetch_hda(target_history.id, velvet_upload_request, wait=True)
velvet2 = {"src": "hda", "id": velvet2_hda["id"]}

# Run gx_data on the second velvet dataset with job cache enabled
job = required_tool.execute(use_cached_job=True).with_inputs({"parameter": velvet2}).assert_has_single_job

# Job cache should be used when all hashes match
assert job.final_details["copied_from_job_id"]

# Upload a third velvet dataset with modified content in one of the extra files
velvet_modified_request = copy.deepcopy(velvet_upload_request)
velvet_modified_request["composite"]["items"][1]["paste_content"] = "roadmaps content MODIFIED"

velvet3_hda = target_history._dataset_populator.fetch_hda(target_history.id, velvet_modified_request, wait=True)
velvet3 = {"src": "hda", "id": velvet3_hda["id"]}

# Run gx_data on the third velvet dataset with job cache enabled
job3 = required_tool.execute(use_cached_job=True).with_inputs({"parameter": velvet3}).assert_has_single_job

# Job cache should NOT be used when hashes don't match completely
assert not job3.final_details["copied_from_job_id"]


@requires_tool_id("gx_repeat_boolean_min")
def test_optional_repeats_with_mins_filled_id(target_history: TargetHistory, required_tool: RequiredTool):
# we have a tool test for this but I wanted to verify it wasn't just the
Expand Down Expand Up @@ -723,14 +782,36 @@ def test_null_to_text_tool_with_validation(required_tool: RequiredTool, tool_inp
required_tool.execute().with_inputs(tool_input_format.when.any({"parameter": ""})).assert_fails()


@requires_tool_id("cat|cat1")
def test_deferred_basic(required_tool: RequiredTool, target_history: TargetHistory):
has_src_dict = target_history.with_deferred_dataset_for_test_file("1.bed", ext="bed")
def _run_deferred(
required_tool: RequiredTool,
target_history: TargetHistory,
use_cached_job: bool = False,
expect_cached_job: bool = False,
include_correct_hash: bool = True,
) -> None:
has_src_dict = target_history.with_deferred_dataset_for_test_file(
"1.bed", ext="bed", include_correct_hash=include_correct_hash
)
inputs = {
"input1": has_src_dict.src_dict,
}
output = required_tool.execute().with_inputs(inputs).assert_has_single_job.with_single_output
job = required_tool.execute(use_cached_job=use_cached_job).with_inputs(inputs).assert_has_single_job
output = job.with_single_output
output.assert_contains("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")
if use_cached_job:
assert bool(job.final_details["copied_from_job_id"]) == expect_cached_job


@requires_tool_id("cat|cat1")
def test_deferred_with_cached_input(required_tool: RequiredTool, target_history: TargetHistory) -> None:
# Basic deferred dataset
_run_deferred(required_tool, target_history)
# Should just work because input is deferred
_run_deferred(required_tool, target_history, use_cached_job=True, expect_cached_job=True)
# Should fail to use the cached job because we don't have a source hash for deduplicating materialized dataset
_run_deferred(
required_tool, target_history, use_cached_job=True, expect_cached_job=False, include_correct_hash=False
)


@requires_tool_id("metadata_bam")
Expand Down
54 changes: 33 additions & 21 deletions lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,30 +605,32 @@ def fetch_hda(self, history_id: str, item: dict[str, Any], wait: bool = True) ->
assert len(hdas) == 1
return hdas[0]

def create_deferred_hda(self, history_id: str, uri: str, ext: Optional[str] = None) -> dict[str, Any]:
item = {
def create_deferred_hda(
self,
history_id: str,
uri: str,
name: Union[str, None] = None,
ext: Optional[str] = None,
hashes: Union[list[dict[str, str]], None] = None,
) -> dict[str, Any]:
item: dict[str, Any] = {
"name": name,
"src": "url",
"url": uri,
"deferred": True,
}
if ext:
item["ext"] = ext
if hashes:
item["hashes"] = hashes
output = self.fetch_hda(history_id, item)
details = self.get_history_dataset_details(history_id, dataset=output)
return details

def create_deferred_hda_with_hash(self, history_id: str, content: str) -> dict[str, Any]:
url = f"base64://{base64.b64encode(content.encode()).decode()}"
hashes = [{"hash_function": "SHA-1", "hash_value": hashlib.sha1(content.encode()).hexdigest()}]
item = {
"ext": "txt",
"src": "url",
"url": url,
"hashes": hashes,
"deferred": True,
}
output = self.fetch_hda(history_id, item)
details = self.get_history_dataset_details(history_id, dataset=output)
details = self.create_deferred_hda(history_id, url, ext="txt", hashes=hashes)
return details

def export_dataset_to_remote_file(self, history_id: str, content: str, name: str, target_uri: str):
Expand Down Expand Up @@ -1833,9 +1835,12 @@ def wait_on_download_request(self, storage_request_id: UUID) -> Response:
def base64_url_for_string(self, content: str) -> str:
return self.base64_url_for_bytes(content.encode("utf-8"))

def base64_url_for_test_file(self, test_filename: str) -> str:
def contents_for_test_file(self, test_filename: str) -> bytes:
test_data_resolver = TestDataResolver()
file_contents = open(test_data_resolver.get_filename(test_filename), "rb").read()
return open(test_data_resolver.get_filename(test_filename), "rb").read()

def base64_url_for_test_file(self, test_filename: str) -> str:
file_contents = self.contents_for_test_file(test_filename)
return self.base64_url_for_bytes(file_contents)

def base64_url_for_bytes(self, content: bytes) -> str:
Expand Down Expand Up @@ -4080,7 +4085,7 @@ def __init__(self, dataset_populator: BaseDatasetPopulator, history_id: str, job
def _wait_for(self):
if self._final_details is None:
self._dataset_populator.wait_for_job(self._job_id, assert_ok=False)
self._final_details = self._dataset_populator.get_job_details(self._job_id).json()
self._final_details = self._dataset_populator.get_job_details(self._job_id, full=True).json()

@property
def final_details(self) -> dict[str, Any]:
Expand Down Expand Up @@ -4489,27 +4494,34 @@ def with_dataset(
def with_deferred_dataset(
self,
uri: str,
named: Optional[str] = None,
name: Optional[str] = None,
ext: Optional[str] = None,
hashes: Union[list[dict[str, str]], None] = None,
) -> "HasSrcDict":
kwd = {}
if named is not None:
kwd["name"] = named
new_dataset = self._dataset_populator.create_deferred_hda(
history_id=self._history_id,
uri=uri,
name=name,
ext=ext,
hashes=hashes,
)
return HasSrcDict("hda", new_dataset)

def with_deferred_dataset_for_test_file(
self,
filename: str,
named: Optional[str] = None,
name: Optional[str] = None,
ext: Optional[str] = None,
include_correct_hash: bool = True,
) -> "HasSrcDict":
base64_url = self._dataset_populator.base64_url_for_test_file(filename)
return self.with_deferred_dataset(base64_url, named=named, ext=ext)
file_contents = self._dataset_populator.contents_for_test_file(filename)
base64_url = self._dataset_populator.base64_url_for_bytes(file_contents)
hashes = (
[{"hash_function": "SHA-1", "hash_value": hashlib.sha1(file_contents).hexdigest()}]
if include_correct_hash
else None
)
return self.with_deferred_dataset(base64_url, name=name, ext=ext, hashes=hashes)

def with_unpaired(self) -> "HasSrcDict":
return self._fetch_response(
Expand Down
Loading