From f0df4fb91bbfab8f7ee3b816e289c21b681864b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 10 Apr 2025 16:54:04 +0200 Subject: [PATCH 1/5] Migrate `JobFilesAPIController` to FastAPI (excluding TUS uploads) `FastAPIJobFiles` is the new, FastAPI version of `JobFilesAPIController`. The endpoints that have been migrated should exhibit exactly the same behavior as the old ones from `FastAPIJobFiles`. Something to keep in mind is that while FastAPI has some extra built-in features that the legacy WSGI system did not have, such as answering HEAD requests, those do not work because of the way legacy WSGI endpoints are injected into the FastAPI app (using `app.mount("/", wsgi_handler)`), meaning that for example, HEAD requests are passed to the `wsgi_handler` sub-application. Endpoints dedicated to TUS uploads work in tandem with the WSGI middleware `TusMiddleware` from the `tuswsgi` package. As explained above, WSGI middlewares and endpoints are injected into the FastAPI app after FastAPI routes as a single sub-application `wsgi_handler` using `app.mount("/", wsgi_handler)`, meaning that requests are passed to the `wsgi_handler` sub-application (and thus to `TusMiddleware`) only if there was no FastAPI endpoint defined to handle them. Therefore, they cannot be migrated to FastAPI unless `TusMiddleware` is also migrated to ASGI. --- client/src/api/schema/schema.ts | 226 +++++++++++++ lib/galaxy/webapps/galaxy/api/job_files.py | 374 +++++++++++++-------- lib/galaxy/webapps/galaxy/buildapp.py | 18 - test/integration/test_job_files.py | 105 +++++- 4 files changed, 556 insertions(+), 167 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 67f4cd7e1821..30a63c85fde2 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -2997,6 +2997,44 @@ export interface paths { patch?: never; trace?: never; }; + "/api/jobs/{job_id}/files": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** + * Get a file required to staging a job. + * @description Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + * files). + * + * This API method is intended only for consumption by job runners, not end users. + */ + get: operations["index_api_jobs__job_id__files_get"]; + put?: never; + /** + * Populate an output file. + * @description Populate an output file (formal dataset, task split part, working directory file (such as those related to + * metadata). This should be a multipart POST with a 'file' parameter containing the contents of the actual file to + * create. + * + * This API method is intended only for consumption by job runners, not end users. + */ + post: operations["create_api_jobs__job_id__files_post"]; + delete?: never; + options?: never; + /** + * Get a file required to staging a job. + * @description Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + * files). + * + * This API method is intended only for consumption by job runners, not end users. + */ + head: operations["index_api_jobs__job_id__files_get"]; + patch?: never; + trace?: never; + }; "/api/jobs/{job_id}/inputs": { parameters: { query?: never; @@ -6628,6 +6666,34 @@ export interface components { /** Name */ name?: unknown; }; + /** Body_create_api_jobs__job_id__files_post */ + Body_create_api_jobs__job_id__files_post: { + /** + * File + * Format: binary + */ + __file?: string; + /** File Path */ + __file_path?: string; + /** + * File + * Format: binary + * @description Contents of the file to create. + */ + file?: string; + /** + * Job Key + * @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. + */ + job_key: string; + /** + * Path + * @description Path to file to create. + */ + path: string; + /** Session Id */ + session_id?: string; + }; /** Body_create_form_api_libraries__library_id__contents_post */ Body_create_form_api_libraries__library_id__contents_post: { /** Create Type */ @@ -31164,6 +31230,166 @@ export interface operations { }; }; }; + index_api_jobs__job_id__files_get: { + parameters: { + query: { + /** @description Path to file. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Contents of file. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/octet-stream": unknown; + }; + }; + /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + create_api_jobs__job_id__files_post: { + parameters: { + query?: never; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody: { + content: { + "multipart/form-data": components["schemas"]["Body_create_api_jobs__job_id__files_post"]; + }; + }; + responses: { + /** @description An okay message. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + index_api_jobs__job_id__files_get: { + parameters: { + query: { + /** @description Path to file. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Contents of file. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/octet-stream": unknown; + }; + }; + /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; get_inputs_api_jobs__job_id__inputs_get: { parameters: { query?: never; diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 31a4974376e7..70fb7b061920 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -1,11 +1,26 @@ -"""API for asynchronous job running mechanisms can use to fetch or put files -related to running and queued jobs. +""" +API for asynchronous job running mechanisms can use to fetch or put files related to running and queued jobs. """ import logging import os import re import shutil +from typing import ( + cast, + IO, + Optional, +) +from urllib.parse import unquote + +from fastapi import ( + File, + Form, + Path, + Query, + UploadFile, +) +from typing_extensions import Annotated from galaxy import ( exceptions, @@ -13,55 +28,90 @@ ) from galaxy.managers.context import ProvidesAppContext from galaxy.model import Job -from galaxy.web import ( - expose_api_anonymous_and_sessionless, - expose_api_raw_anonymous_and_sessionless, +from galaxy.web import expose_api_anonymous_and_sessionless +from galaxy.webapps.base.api import GalaxyFileResponse +from galaxy.webapps.galaxy.api import ( + DependsOnTrans, + Router, ) from . import BaseGalaxyAPIController +__all__ = ("FastAPIJobFiles", "JobFilesAPIController", "router") + log = logging.getLogger(__name__) -class JobFilesAPIController(BaseGalaxyAPIController): - """This job files controller allows remote job running mechanisms to - read and modify the current state of files for queued and running jobs. - It is certainly not meant to represent part of Galaxy's stable, user - facing API. - - Furthermore, even if a user key corresponds to the user running the job, - it should not be accepted for authorization - this API allows access to - low-level unfiltered files and such authorization would break Galaxy's - security model for tool execution. - """ +router = Router( + # keep the endpoint in the undocumented section of the API docs `/api/docs`, as all endpoints from `FastAPIJobFiles` + # are certainly not meant to represent part of Galaxy's stable, user facing API + tags=["undocumented"] +) - @expose_api_raw_anonymous_and_sessionless - def index(self, trans: ProvidesAppContext, job_id, **kwargs): - """ - GET /api/jobs/{job_id}/files - Get a file required to staging a job (proper datasets, extra inputs, - task-split inputs, working directory files). +@router.cbv +class FastAPIJobFiles: + """ + This job files controller allows remote job running mechanisms to read and modify the current state of files for + queued and running jobs. It is certainly not meant to represent part of Galaxy's stable, user facing API. - :type job_id: str - :param job_id: encoded id string of the job - :type path: str - :param path: Path to file. - :type job_key: str - :param job_key: A key used to authenticate this request as acting on - behalf or a job runner for the specified job. + Furthermore, even if a user key corresponds to the user running the job, it should not be accepted for authorization + - this API allows access to low-level unfiltered files and such authorization would break Galaxy's security model + for tool execution. + """ - ..note: - This API method is intended only for consumption by job runners, - not end users. + # FastAPI answers HEAD requests automatically for GET endpoints. However, because of the way legacy WSGI endpoints + # are injected into the FastAPI app (using `app.mount("/", wsgi_handler)`), the built-in support for `HEAD` requests + # breaks, because such requests are passed to the `wsgi_handler` sub-application. This means that the endpoint still + # needs to include some code to handle this behavior, as tests existing before the migration to FastAPI expect HEAD + # requests to work. + # + # @router.get( # use me when ALL endpoints have been migrated to FastAPI + @router.api_route( + "/api/jobs/{job_id}/files", + summary="Get a file required to staging a job.", + responses={ + 200: { + "description": "Contents of file.", + "content": {"application/json": None, "application/octet-stream": {"example": None}}, + }, + 400: { + "description": ( + "File not found, path does not refer to a file, or input dataset(s) for job have been purged." + ) + }, + }, + methods=["GET", "HEAD"], # remove me when ALL endpoints have been migrated to FastAPI + ) + def index( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[ + str, + Query( + description="Path to file.", + ), + ], + job_key: Annotated[ + str, + Query( + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ], + trans: ProvidesAppContext = DependsOnTrans, + ) -> GalaxyFileResponse: + """ + Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + files). - :rtype: binary - :returns: contents of file + This API method is intended only for consumption by job runners, not end users. """ - job = self.__authorize_job_access(trans, job_id, **kwargs) - path = kwargs["path"] - try: - return open(path, "rb") - except FileNotFoundError: + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) + + if not os.path.exists(path): # We know that the job is not terminal, but users (or admin scripts) can purge input datasets. # Here we discriminate that case from truly unexpected bugs. # Not failing the job here, this is or should be handled by pulsar. @@ -70,55 +120,74 @@ def index(self, trans: ProvidesAppContext, job_id, **kwargs): # This looks like a galaxy dataset, check if any job input has been deleted. if any(jtid.dataset.dataset.purged for jtid in job.input_datasets): raise exceptions.ItemDeletionException("Input dataset(s) for job have been purged.") - else: - raise - @expose_api_anonymous_and_sessionless - def create(self, trans, job_id, payload, **kwargs): + return GalaxyFileResponse(path) + + @router.post( + "/api/jobs/{job_id}/files", + summary="Populate an output file.", + responses={ + 200: {"description": "An okay message.", "content": {"application/json": {"example": {"message": "ok"}}}}, + }, + ) + def create( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[str, Form(description="Path to file to create.")], + job_key: Annotated[ + str, + Form( + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ) + ), + ], + file: UploadFile = File(None, description="Contents of the file to create."), + session_id: str = Form(None), + nginx_upload_module_file_path: str = Form( + None, + alias="__file_path", + validation_alias="__file_path", + # both `alias` and `validation_alias` are needed for body parameters, see + # https://github.com/fastapi/fastapi/issues/10286#issuecomment-1727642960 + ), + underscore_file: UploadFile = File( + None, + alias="__file", + validation_alias="__file", + # both `alias` and `validation_alias` are needed for body parameters, see + # https://github.com/fastapi/fastapi/issues/10286#issuecomment-1727642960 + ), + trans: ProvidesAppContext = DependsOnTrans, + ): """ - create( self, trans, job_id, payload, **kwargs ) - * POST /api/jobs/{job_id}/files - Populate an output file (formal dataset, task split part, working - directory file (such as those related to metadata)). This should be - a multipart post with a 'file' parameter containing the contents of - the actual file to create. - - :type job_id: str - :param job_id: encoded id string of the job - :type payload: dict - :param payload: dictionary structure containing:: - 'job_key' = Key authenticating - 'path' = Path to file to create. - - ..note: - This API method is intended only for consumption by job runners, - not end users. - - :rtype: dict - :returns: an okay message + Populate an output file (formal dataset, task split part, working directory file (such as those related to + metadata). This should be a multipart POST with a 'file' parameter containing the contents of the actual file to + create. + + This API method is intended only for consumption by job runners, not end users. """ - job = self.__authorize_job_access(trans, job_id, **payload) - path = payload.get("path") - if not path: - raise exceptions.RequestParameterInvalidException("'path' parameter not provided or empty.") + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) self.__check_job_can_write_to_path(trans, job, path) + input_file: Optional[IO[bytes]] = None + input_file_path: Optional[str] = None # Is this writing an unneeded file? Should this just copy in Python? - if "__file_path" in payload: - file_path = payload.get("__file_path") + if nginx_upload_module_file_path: + input_file_path = nginx_upload_module_file_path upload_store = trans.app.config.nginx_upload_job_files_store assert upload_store, ( "Request appears to have been processed by" " nginx_upload_module but Galaxy is not" " configured to recognize it" ) - assert file_path.startswith( + assert input_file_path.startswith( upload_store - ), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})" - input_file = open(file_path) - elif "session_id" in payload: + ), f"Filename provided by nginx ({input_file_path}) is not in correct directory ({upload_store})" + elif session_id: # code stolen from basic.py - session_id = payload["session_id"] upload_store = ( trans.app.config.tus_upload_store_job_files or trans.app.config.tus_upload_store @@ -127,76 +196,41 @@ def create(self, trans, job_id, payload, **kwargs): if re.match(r"^[\w-]+$", session_id) is None: raise ValueError("Invalid session id format.") local_filename = os.path.abspath(os.path.join(upload_store, session_id)) - input_file = open(local_filename) + input_file_path = local_filename + elif file: + input_file = file.file + elif underscore_file: + input_file = underscore_file.file else: - input_file = payload.get("file", payload.get("__file", None)).file + raise exceptions.RequestParameterMissingException("No file uploaded.") + target_dir = os.path.dirname(path) util.safe_makedirs(target_dir) - try: - if os.path.exists(path) and (path.endswith("tool_stdout") or path.endswith("tool_stderr")): - with open(path, "ab") as destination: - shutil.copyfileobj(open(input_file.name, "rb"), destination) + if os.path.exists(path) and (path.endswith("tool_stdout") or path.endswith("tool_stderr")): + with open(path, "ab") as destination: + if input_file_path: + with open(input_file_path, "rb") as input_file_handle: + shutil.copyfileobj(input_file_handle, destination) + else: + shutil.copyfileobj(cast(IO[bytes], input_file), destination) + else: + # prior to migrating to FastAPI, this operation was done more efficiently for all cases using + # `shutil.move(input_file_path, path)`, but FastAPI stores the uploaded file as + # `tempfile.SpooledTemporaryFile` + # (https://docs.python.org/3/library/tempfile.html#tempfile.SpooledTemporaryFile), so now there is not even + # a path where uploaded files can be accessed on disk + if input_file_path: + shutil.move(input_file_path, path) else: - shutil.move(input_file.name, path) - finally: - try: - input_file.close() - except OSError: - # Fails to close file if not using nginx upload because the - # tempfile has moved and Python wants to delete it. - pass - return {"message": "ok"} - - @expose_api_anonymous_and_sessionless - def tus_patch(self, trans, **kwds): - """ - Exposed as PATCH /api/job_files/resumable_upload. - - I think based on the docs, a separate tusd server is needed for job files if - also hosting one for use facing uploads. - - Setting up tusd for job files should just look like (I think): - - tusd -host localhost -port 1080 -upload-dir=/database/tmp - - See more discussion of checking upload access, but we shouldn't need the - API key and session stuff the user upload tusd server should be configured with. - - Also shouldn't need a hooks endpoint for this reason but if you want to add one - the target CLI entry would be -hooks-http=/api/job_files/tus_hooks - and the action is featured below. - - I would love to check the job state with __authorize_job_access on the first - POST but it seems like TusMiddleware doesn't default to coming in here for that - initial POST the way it does for the subsequent PATCHes. Ultimately, the upload - is still authorized before the write done with POST /api/jobs//files - so I think there is no route here to mess with user data - the worst of the security - issues that can be caused is filling up the sever with needless files that aren't - acted on. Since this endpoint is not meant for public consumption - all the job - files stuff and the TUS server should be blocked to public IPs anyway and restricted - to your Pulsar servers and similar targeting could be accomplished with a user account - and the user facing upload endpoints. - """ - return None - - @expose_api_anonymous_and_sessionless - def tus_hooks(self, trans, **kwds): - """No-op but if hook specified the way we do for user upload it would hit this action. - - Exposed as PATCH /api/job_files/tus_hooks and documented in the docstring for - tus_patch. - """ - pass + with open(path, "wb") as destination: + shutil.copyfileobj(cast(IO[bytes], input_file), destination) - def __authorize_job_access(self, trans, encoded_job_id, **kwargs): - for key in ["path", "job_key"]: - if key not in kwargs: - error_message = f"Job files action requires a valid '{key}'." - raise exceptions.ObjectAttributeMissingException(error_message) + return {"message": "ok"} + def __authorize_job_access(self, trans, encoded_job_id, path, job_key): job_id = trans.security.decode_id(encoded_job_id) - job_key = trans.security.encode_id(job_id, kind="jobs_files") - if not util.safe_str_cmp(str(kwargs["job_key"]), job_key): + job_key_from_job_id = trans.security.encode_id(job_id, kind="jobs_files") + if not util.safe_str_cmp(str(job_key), job_key_from_job_id): raise exceptions.ItemAccessibilityException("Invalid job_key supplied.") # Verify job is active. Don't update the contents of complete jobs. @@ -207,9 +241,9 @@ def __authorize_job_access(self, trans, encoded_job_id, **kwargs): return job def __check_job_can_write_to_path(self, trans, job, path): - """Verify an idealized job runner should actually be able to write to - the specified path - it must be a dataset output, a dataset "extra - file", or a some place in the working directory of this job. + """ + Verify an idealized job runner should actually be able to write to the specified path - it must be a dataset + output, a dataset "extra file", or a some place in the working directory of this job. Would like similar checks for reading the unstructured nature of loc files make this very difficult. (See abandoned work here @@ -220,8 +254,8 @@ def __check_job_can_write_to_path(self, trans, job, path): raise exceptions.ItemAccessibilityException("Job is not authorized to write to supplied path.") def __is_output_dataset_path(self, job, path): - """Check if is an output path for this job or a file in the an - output's extra files path. + """ + Check if is an output path for this job or a file in the output's extra files path. """ da_lists = [job.output_datasets, job.output_library_datasets] for da_list in da_lists: @@ -240,3 +274,59 @@ def __in_working_directory(self, job, path, app): job, base_dir="job_work", dir_only=True, extra_dir=str(job.id) ) return util.in_directory(path, working_directory) + + +class JobFilesAPIController(BaseGalaxyAPIController): + """ + Legacy WSGI endpoints dedicated to TUS uploads. + + TUS upload endpoints work in tandem with the WSGI middleware `TusMiddleware` from the `tuswsgi` package. Both + WSGI middlewares and endpoints are injected into the FastAPI app after FastAPI routes as a single sub-application + `wsgi_handler` using `app.mount("/", wsgi_handler)`, meaning that requests are passed to the `wsgi_handler` + sub-application (and thus to `TusMiddleware`) only if there was no FastAPI endpoint defined to handle them. + + Therefore, these legacy WSGI endpoints cannot be migrated to FastAPI unless `TusMiddleware` is migrated to ASGI. + """ + + @expose_api_anonymous_and_sessionless + def tus_patch(self, trans, **kwds): + """ + Exposed as PATCH /api/job_files/resumable_upload. + + I think based on the docs, a separate tusd server is needed for job files if + also hosting one for use facing uploads. + + Setting up tusd for job files should just look like (I think): + + `tusd -host localhost -port 1080 -upload-dir=/database/tmp` + + See more discussion of checking upload access, but we shouldn't need the + API key and session stuff the user upload tusd server should be configured with. + + Also shouldn't need a hooks endpoint for this reason but if you want to add one + the target CLI entry would be `-hooks-http=/api/job_files/tus_hooks` + and the action is featured below. + + I would love to check the job state with `__authorize_job_access` on the first + POST but it seems like `TusMiddleware` doesn't default to coming in here for that + initial POST the way it does for the subsequent PATCHes. Ultimately, the upload + is still authorized before the write done with POST `/api/jobs//files` + so I think there is no route here to mess with user data - the worst of the security + issues that can be caused is filling up the sever with needless files that aren't + acted on. Since this endpoint is not meant for public consumption - all the job + files stuff and the TUS server should be blocked to public IPs anyway and restricted + to your Pulsar servers and similar targeting could be accomplished with a user account + and the user facing upload endpoints. + """ + ... + return None + + @expose_api_anonymous_and_sessionless + def tus_hooks(self, trans, **kwds): + """ + No-op but if hook specified the way we do for user upload it would hit this action. + + Exposed as PATCH /api/job_files/tus_hooks and documented in the docstring for tus_patch. + """ + ... + return None diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index e603abf6a1ef..ad1962de7188 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -870,24 +870,6 @@ def populate_api_routes(webapp, app): conditions=dict(method=["GET"]), ) - # Job files controllers. Only for consumption by remote job runners. - webapp.mapper.resource( - "file", - "files", - controller="job_files", - name_prefix="job_", - path_prefix="/api/jobs/{job_id}", - parent_resources=dict(member_name="job", collection_name="jobs"), - ) - - webapp.mapper.connect( - "index", - "/api/jobs/{job_id}/files", - controller="job_files", - action="index", - conditions=dict(method=["HEAD"]), - ) - webapp.mapper.resource( "port", "ports", diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 4f244419156b..1d6a450211dd 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -17,8 +17,14 @@ import io import os +import shutil import tempfile -from typing import Dict +from typing import ( + Any, + Dict, + IO, + Optional, +) import requests from sqlalchemy import select @@ -42,26 +48,39 @@ class TestJobFilesIntegration(integration_util.IntegrationTestCase): initialized = False dataset_populator: DatasetPopulator + input_hda: model.HistoryDatasetAssociation + input_hda_dict: Dict[str, Any] + _nginx_upload_job_files_store: str + @classmethod def handle_galaxy_config_kwds(cls, config): super().handle_galaxy_config_kwds(config) + config["job_config_file"] = SIMPLE_JOB_CONFIG_FILE config["object_store_store_by"] = "uuid" config["server_name"] = "files" + config["nginx_upload_job_files_store"] = tempfile.mkdtemp() + cls._nginx_upload_job_files_store = config["nginx_upload_job_files_store"] cls.initialized = False + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls._nginx_upload_job_files_store) + super().tearDownClass() + def setUp(self): super().setUp() - self.dataset_populator = DatasetPopulator(self.galaxy_interactor) - if not TestJobFilesIntegration.initialized: - history_id = self.dataset_populator.new_history() + cls = TestJobFilesIntegration + cls.dataset_populator = DatasetPopulator(self.galaxy_interactor) + if not cls.initialized: + history_id = cls.dataset_populator.new_history() sa_session = self.sa_session stmt = select(model.HistoryDatasetAssociation) assert len(sa_session.scalars(stmt).all()) == 0 - self.input_hda_dict = self.dataset_populator.new_dataset(history_id, content=TEST_INPUT_TEXT, wait=True) + cls.input_hda_dict = cls.dataset_populator.new_dataset(history_id, content=TEST_INPUT_TEXT, wait=True) assert len(sa_session.scalars(stmt).all()) == 1 - self.input_hda = sa_session.scalars(stmt).all()[0] - TestJobFilesIntegration.initialized = True + cls.input_hda = sa_session.scalars(stmt).all()[0] + cls.initialized = True def test_read_by_state(self): job, _, _ = self.create_static_job_with_state("running") @@ -160,6 +179,78 @@ def test_write_with_tus(self): api_asserts.assert_status_code_is_ok(response) assert open(path).read() == "some initial text data" + def test_write_with_nginx_upload_module(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + file: Optional[IO[bytes]] = None + try: + with open(os.path.join(self._app.config.nginx_upload_job_files_store, "nginx_upload"), "wb") as file: + file.write(b"some initial text data") + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=dict(**data, __file_path=file.name)) + + api_asserts.assert_status_code_is_ok(response) + assert not os.path.exists(file.name) + assert os.path.exists(path) + with open(path) as uploaded_file: + assert uploaded_file.read() == "some initial text data" + finally: + # remove `file.name` + try: + if file is not None: + os.remove(file.name) + except FileNotFoundError: + pass + + def test_write_with_session_id(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + upload_store = ( + self._app.config.tus_upload_store_job_files + or self._app.config.tus_upload_store + or self._app.config.new_file_path + ) + upload_id = "35a7c8d3-e659-430e-8579-8d085e7e569d" + upload_path = os.path.join(upload_store, "35a7c8d3-e659-430e-8579-8d085e7e569d") + try: + with open(upload_path, "w") as upload_file: + upload_file.write("some initial text data") + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=dict(**data, session_id=upload_id)) + api_asserts.assert_status_code_is_ok(response) + assert not os.path.exists(upload_path) + assert os.path.exists(path) + with open(path) as uploaded_file: + assert uploaded_file.read() == "some initial text data" + finally: + # remove `upload_path` + try: + os.remove(upload_path) + except FileNotFoundError: + pass + + def test_write_with_underscored_file_param(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data, files={"__file": io.StringIO("some initial text data")}) + api_asserts.assert_status_code_is_ok(response) + assert open(path).read() == "some initial text data" + def test_write_protection(self): job, _, _ = self.create_static_job_with_state("running") job_id, job_key = self._api_job_keys(job) From 4b8b2b9d80165cf05a50d5232c8437aff022bb1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 15 May 2025 14:30:46 +0200 Subject: [PATCH 2/5] Generate unique operation ids for `/api/jobs/{job_id}/files` Work around a bug in FastAPI (https://github.com/fastapi/fastapi/issues/13175) that assigns the same operation id to both request methods GET and HEAD of the endpoint `/api/jobs/{job_id}/files` when using the `@router.api_route()` decorator with `methods=["GET", "HEAD"]` as keyword argument. --- client/src/api/schema/schema.ts | 4 +-- lib/galaxy/webapps/galaxy/api/job_files.py | 38 +++++++++++++--------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 30a63c85fde2..e130ae725592 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -3031,7 +3031,7 @@ export interface paths { * * This API method is intended only for consumption by job runners, not end users. */ - head: operations["index_api_jobs__job_id__files_get"]; + head: operations["index_api_jobs__job_id__files_head"]; patch?: never; trace?: never; }; @@ -31334,7 +31334,7 @@ export interface operations { }; }; }; - index_api_jobs__job_id__files_get: { + index_api_jobs__job_id__files_head: { parameters: { query: { /** @description Path to file. */ diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 70fb7b061920..6cfbd67226f0 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -65,23 +65,29 @@ class FastAPIJobFiles: # needs to include some code to handle this behavior, as tests existing before the migration to FastAPI expect HEAD # requests to work. # - # @router.get( # use me when ALL endpoints have been migrated to FastAPI - @router.api_route( - "/api/jobs/{job_id}/files", - summary="Get a file required to staging a job.", - responses={ - 200: { - "description": "Contents of file.", - "content": {"application/json": None, "application/octet-stream": {"example": None}}, - }, - 400: { - "description": ( - "File not found, path does not refer to a file, or input dataset(s) for job have been purged." - ) - }, - }, - methods=["GET", "HEAD"], # remove me when ALL endpoints have been migrated to FastAPI + @router.get( + # simplify me (remove `_args` and `_kwargs` defined using the walrus operator) when ALL endpoints have been + # migrated to FastAPI, this is a workaround for FastAPI bug https://github.com/fastapi/fastapi/issues/13175 + *(_args := ["/api/jobs/{job_id}/files"]), + **( + _kwargs := dict( + summary="Get a file required to staging a job.", + responses={ + 200: { + "description": "Contents of file.", + "content": {"application/json": None, "application/octet-stream": {"example": None}}, + }, + 400: { + "description": ( + "File not found, path does not refer to a file, or input dataset(s) for job have been purged." + ) + }, + }, + ) + ), ) + @router.head(*_args, **_kwargs) # type: ignore[name-defined] + # remove `@router.head(...)` when ALL endpoints have been migrated to FastAPI def index( self, job_id: Annotated[str, Path(description="Encoded id string of the job.")], From d17002d62b94dc364220401e8a2350ddc2f230bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Tue, 20 May 2025 11:46:51 +0200 Subject: [PATCH 3/5] Accept `path` and `job_key` both as query and form parameters for POST requests to `/api/jobs/{job_id}/files` Pulsar formats the `path` and `job_key` parameters as query parameters when submitting POST requests to `/api/jobs/{job_id}/files`. However, many Galaxy tests format them as form parameters. The only way to keep the endpoint working as it should (as it worked before the migration to FastAPI) is to accept both query and form parameters. --- client/src/api/schema/schema.ts | 17 +++--- lib/galaxy/webapps/galaxy/api/job_files.py | 61 ++++++++++++++++++---- 2 files changed, 63 insertions(+), 15 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index e130ae725592..3fcc57611df2 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -6682,15 +6682,15 @@ export interface components { */ file?: string; /** - * Job Key + * Job Key Form * @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ - job_key: string; + job_key_form?: string | null; /** - * Path + * Path Form * @description Path to file to create. */ - path: string; + path_form?: string | null; /** Session Id */ session_id?: string; }; @@ -31288,7 +31288,12 @@ export interface operations { }; create_api_jobs__job_id__files_post: { parameters: { - query?: never; + query?: { + /** @description Path to file to create. */ + path?: string | null; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key?: string | null; + }; header?: { /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ "run-as"?: string | null; @@ -31299,7 +31304,7 @@ export interface operations { }; cookie?: never; }; - requestBody: { + requestBody?: { content: { "multipart/form-data": components["schemas"]["Body_create_api_jobs__job_id__files_post"]; }; diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 6cfbd67226f0..954dd698dd48 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -18,8 +18,10 @@ Form, Path, Query, + Request, UploadFile, ) +from fastapi.params import Depends from typing_extensions import Annotated from galaxy import ( @@ -48,6 +50,54 @@ ) +def path_query_or_form( + request: Request, + path_query: Annotated[Optional[str], Query(alias="path", description="Path to file to create.")] = None, + path_form: Annotated[Optional[str], Form(alias="path", description="Path to file to create.")] = None, +): + """ + Accept `path` parameter both in query and form format. + + This method does not force the client to provide the parameter, it could simply not submit the parameter in either + format. To force the client to provide the parameter, coerce the output of the method to a string, i.e. + `path: str = Depends(path_query_or_form)` so that FastAPI responds with status code 500 when the parameter is not + provided. + """ + return path_query or path_form + + +def job_key_query_or_form( + request: Request, + job_key_query: Annotated[ + Optional[str], + Query( + alias="job_key", + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ] = None, + job_key_form: Annotated[ + Optional[str], + Form( + alias="job_key", + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ] = None, +): + """ + Accept `job_key` parameter both in query and form format. + + This method does not force the client to provide the parameter, it could simply not submit the parameter in either + format. To force the client to provide the parameter, coerce the output of the method to a string, i.e. + `job_key: str = Depends(job_key_query_or_form)` so that FastAPI responds with status code 500 when the parameter is + not provided. + """ + return job_key_query or job_key_form + + @router.cbv class FastAPIJobFiles: """ @@ -139,15 +189,8 @@ def index( def create( self, job_id: Annotated[str, Path(description="Encoded id string of the job.")], - path: Annotated[str, Form(description="Path to file to create.")], - job_key: Annotated[ - str, - Form( - description=( - "A key used to authenticate this request as acting on behalf or a job runner for the specified job." - ) - ), - ], + path: Annotated[str, Depends(path_query_or_form)], + job_key: Annotated[str, Depends(job_key_query_or_form)], file: UploadFile = File(None, description="Contents of the file to create."), session_id: str = Form(None), nginx_upload_module_file_path: str = Form( From 071d2d5e45314722b011af739a3a2e27dff172f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Tue, 20 May 2025 15:56:51 +0200 Subject: [PATCH 4/5] Fix OpenAPI docs for `path` and `job_key` as form parameters for POST requests to `/api/jobs/{job_id}/files` FastAPI will not use the parameter aliases of form parameters in the OpenAPI docs, but the name of their Python variables. Therefore, the API docs show `path_form` and `job_key_form`. Rename them so that the API docs show the correct parameter names. --- client/src/api/schema/schema.ts | 8 ++++---- lib/galaxy/webapps/galaxy/api/job_files.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 3fcc57611df2..0d87592004a1 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -6682,15 +6682,15 @@ export interface components { */ file?: string; /** - * Job Key Form + * Job Key * @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ - job_key_form?: string | null; + job_key?: string | null; /** - * Path Form + * Path * @description Path to file to create. */ - path_form?: string | null; + path?: string | null; /** Session Id */ session_id?: string; }; diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 954dd698dd48..a8d3e792cba9 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -53,7 +53,7 @@ def path_query_or_form( request: Request, path_query: Annotated[Optional[str], Query(alias="path", description="Path to file to create.")] = None, - path_form: Annotated[Optional[str], Form(alias="path", description="Path to file to create.")] = None, + path: Annotated[Optional[str], Form(alias="path", description="Path to file to create.")] = None, ): """ Accept `path` parameter both in query and form format. @@ -63,7 +63,7 @@ def path_query_or_form( `path: str = Depends(path_query_or_form)` so that FastAPI responds with status code 500 when the parameter is not provided. """ - return path_query or path_form + return path_query or path def job_key_query_or_form( @@ -77,7 +77,7 @@ def job_key_query_or_form( ), ), ] = None, - job_key_form: Annotated[ + job_key: Annotated[ Optional[str], Form( alias="job_key", @@ -95,7 +95,7 @@ def job_key_query_or_form( `job_key: str = Depends(job_key_query_or_form)` so that FastAPI responds with status code 500 when the parameter is not provided. """ - return job_key_query or job_key_form + return job_key_query or job_key @router.cbv From f55352466fa2a65c4bae315e0af1eefe497a3465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 22 May 2025 11:31:47 +0200 Subject: [PATCH 5/5] Improve compliance of `FastAPIJobFiles` with the HTTP spec (`files` endpoint) This commit introduces the following differences in behavior: - GET and HEAD requests to `/api/jobs/{job_id}/files` return HTTP status code 400 when the given path does not refer to a file or the input datasets for the job have been purged and 404 when the given path does not exist. - PROPFIND requests to `/api/jobs/{job_id}/files` are answered with HTTP status code 501 (read motivation for this change below). - POST requests to `/api/jobs/{job_id}/files` are answered with HTTP status code 400 when no file is provided. The reason behind the code explicitly answering `PROPFIND` requests with status code 501 is an unfortunate interaction between the ARC remote job runner that is under development, the behavior of legacy API endpoints and how they are integrated within the FastAPI app. The ARC remote job runner (which will be implemented as `lib.galaxy.jobs.runners.pulsar.PulsarARCJobRunner`) expects this endpoint to return HTTP codes other than 404 when `PROPFIND` requests are issued. They are not part of the HTTP spec, but they are used in the WebDAV protocol. The correct answer to such requests is likely 501 (not implemented). FastAPI returns HTTP 405 (method not allowed) for `PROPFIND`, which maybe is not fully correct but tolerable because it is one less quirk to maintain. However, because of the way legacy WSGI endpoints are injected into the FastAPI app (using `app.mount("/", wsgi_handler)`), the built-in support for returning HTTP 405 for `PROPFIND` breaks, because such requests are passed to the `wsgi_handler` sub-application. This means that the endpoint still needs to include some code to handle this behavior. When ALL routes have been migrated to ASGI (no WSGI handler sub-application needed anymore), some lines of code can be removed, they are labeled using comments. --- client/src/api/schema/schema.ts | 47 ++++++++++++- lib/galaxy/webapps/galaxy/api/__init__.py | 4 ++ lib/galaxy/webapps/galaxy/api/job_files.py | 50 +++++++++++-- lib/galaxy/work/context.py | 5 ++ test/integration/test_job_files.py | 81 +++++++++++++++++----- 5 files changed, 165 insertions(+), 22 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 0d87592004a1..9e28da862c5c 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -31259,8 +31259,26 @@ export interface operations { "application/octet-stream": unknown; }; }; - /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + /** @description Path does not refer to a file, or input dataset(s) for job have been purged. */ 400: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description File not found. */ + 404: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Input dataset(s) for job have been purged. */ + 500: { headers: { [name: string]: unknown; }; @@ -31319,6 +31337,13 @@ export interface operations { "application/json": unknown; }; }; + /** @description Bad request (including no file provided). */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; /** @description Request Error */ "4XX": { headers: { @@ -31368,8 +31393,26 @@ export interface operations { "application/octet-stream": unknown; }; }; - /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + /** @description Path does not refer to a file, or input dataset(s) for job have been purged. */ 400: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description File not found. */ + 404: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Input dataset(s) for job have been purged. */ + 500: { headers: { [name: string]: unknown; }; diff --git a/lib/galaxy/webapps/galaxy/api/__init__.py b/lib/galaxy/webapps/galaxy/api/__init__.py index 51066fed89c1..19097dd34ae7 100644 --- a/lib/galaxy/webapps/galaxy/api/__init__.py +++ b/lib/galaxy/webapps/galaxy/api/__init__.py @@ -265,6 +265,10 @@ def environ(self) -> Environ: self.__environ = build_environ(self.__request.scope, None) # type: ignore[arg-type] return self.__environ + @property + def method(self): + return self.__request.method + @property def headers(self): return self.__request.headers diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index a8d3e792cba9..b429d2179057 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -36,6 +36,7 @@ DependsOnTrans, Router, ) +from galaxy.work.context import SessionRequestContext from . import BaseGalaxyAPIController __all__ = ("FastAPIJobFiles", "JobFilesAPIController", "router") @@ -128,16 +129,48 @@ class FastAPIJobFiles: "content": {"application/json": None, "application/octet-stream": {"example": None}}, }, 400: { - "description": ( - "File not found, path does not refer to a file, or input dataset(s) for job have been purged." - ) + "description": "Path does not refer to a file, or input dataset(s) for job have been purged.", + "content": { + "application/json": { + "example": { + "detail": ( + "Path does not refer to a file, or input dataset(s) for job have been purged." + ) + }, + } + }, }, + 404: { + "description": "File not found.", + "content": { + "application/json": { + "example": {"detail": "File not found."}, + } + }, + }, + 500: {"description": "Input dataset(s) for job have been purged."}, }, ) ), ) @router.head(*_args, **_kwargs) # type: ignore[name-defined] # remove `@router.head(...)` when ALL endpoints have been migrated to FastAPI + @router.api_route( + *_args, # type: ignore[name-defined] + **{key: value for key, value in _kwargs.items() if key != "responses"}, # type: ignore[name-defined] + responses={501: {"description": "Not implemented."}}, + methods=["PROPFIND"], + include_in_schema=False, + ) + # remove `@router.api_route(..., methods=["PROPFIND"])` when ALL endpoints have been migrated to FastAPI + # The ARC remote job runner (`lib.galaxy.jobs.runners.pulsar.PulsarARCJobRunner`) expects this to return HTTP codes + # other than 404 when `PROPFIND` requests are issued. They are not part of the HTTP spec, but they are used in the + # WebDAV protocol. The correct answer to such requests is likely 501 (not implemented). FastAPI returns HTTP 405 + # (method not allowed) for `PROPFIND`, which maybe is not fully correct but tolerable because it is one less quirk + # to maintain. However, because of the way legacy WSGI endpoints are injected into the FastAPI app (using + # `app.mount("/", wsgi_handler)`), the built-in support for returning HTTP 405 for `PROPFIND` breaks, because such + # requests are passed to the `wsgi_handler` sub-application. This means that the endpoint still needs to include + # some code to handle this behavior. def index( self, job_id: Annotated[str, Path(description="Encoded id string of the job.")], @@ -155,7 +188,7 @@ def index( ), ), ], - trans: ProvidesAppContext = DependsOnTrans, + trans: SessionRequestContext = DependsOnTrans, ) -> GalaxyFileResponse: """ Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory @@ -163,6 +196,11 @@ def index( This API method is intended only for consumption by job runners, not end users. """ + # PROPFIND is not implemented, but the endpoint needs to return a non-404 error code for it + # remove me when ALL endpoints have been migrated to FastAPI + if trans.request.method == "PROPFIND": + raise exceptions.NotImplemented() + path = unquote(path) job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) @@ -176,6 +214,9 @@ def index( # This looks like a galaxy dataset, check if any job input has been deleted. if any(jtid.dataset.dataset.purged for jtid in job.input_datasets): raise exceptions.ItemDeletionException("Input dataset(s) for job have been purged.") + raise exceptions.ObjectNotFound("File not found.") + elif not os.path.isfile(path): + raise exceptions.RequestParameterInvalidException("Path does not refer to a file.") return GalaxyFileResponse(path) @@ -184,6 +225,7 @@ def index( summary="Populate an output file.", responses={ 200: {"description": "An okay message.", "content": {"application/json": {"example": {"message": "ok"}}}}, + 400: {"description": "Bad request (including no file provided)."}, }, ) def create( diff --git a/lib/galaxy/work/context.py b/lib/galaxy/work/context.py index e723e069109c..6584f2d9783f 100644 --- a/lib/galaxy/work/context.py +++ b/lib/galaxy/work/context.py @@ -97,6 +97,11 @@ def url_path(self) -> str: def host(self) -> str: """The host address.""" + @property + @abc.abstractmethod + def method(self) -> str: + """The request's HTTP method.""" + @property @abc.abstractmethod def is_secure(self) -> bool: diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 1d6a450211dd..ba9cbac168c0 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -48,6 +48,7 @@ class TestJobFilesIntegration(integration_util.IntegrationTestCase): initialized = False dataset_populator: DatasetPopulator + hist_id: int # cannot use `history_id` as name, it collides with a pytest fixture input_hda: model.HistoryDatasetAssociation input_hda_dict: Dict[str, Any] _nginx_upload_job_files_store: str @@ -61,7 +62,6 @@ def handle_galaxy_config_kwds(cls, config): config["server_name"] = "files" config["nginx_upload_job_files_store"] = tempfile.mkdtemp() cls._nginx_upload_job_files_store = config["nginx_upload_job_files_store"] - cls.initialized = False @classmethod def tearDownClass(cls): @@ -70,17 +70,14 @@ def tearDownClass(cls): def setUp(self): super().setUp() - cls = TestJobFilesIntegration - cls.dataset_populator = DatasetPopulator(self.galaxy_interactor) - if not cls.initialized: - history_id = cls.dataset_populator.new_history() - sa_session = self.sa_session - stmt = select(model.HistoryDatasetAssociation) - assert len(sa_session.scalars(stmt).all()) == 0 - cls.input_hda_dict = cls.dataset_populator.new_dataset(history_id, content=TEST_INPUT_TEXT, wait=True) - assert len(sa_session.scalars(stmt).all()) == 1 - cls.input_hda = sa_session.scalars(stmt).all()[0] - cls.initialized = True + self.dataset_populator = DatasetPopulator(self.galaxy_interactor) + history_id_encoded = self.dataset_populator.new_history() + self.hist_id = self._app.security.decode_id(history_id_encoded) + self.input_hda_dict = self.dataset_populator.new_dataset(history_id_encoded, content=TEST_INPUT_TEXT, wait=True) + sa_session = self.sa_session + stmt = select(model.HistoryDatasetAssociation).where(model.HistoryDatasetAssociation.history_id == self.hist_id) + assert len(sa_session.scalars(stmt).all()) == 1 + self.input_hda = sa_session.scalars(stmt).first() def test_read_by_state(self): job, _, _ = self.create_static_job_with_state("running") @@ -115,9 +112,57 @@ def test_read_fails_if_input_file_purged(self): self.input_hda_dict["history_id"], content_id=self.input_hda_dict["id"], purge=True, wait_for_purge=True ) assert delete_response.status_code == 200 - head_response = requests.get(get_url, params=data) + response = requests.get(get_url, params=data) + assert response.status_code == 400 + assert response.json()["err_msg"] == "Input dataset(s) for job have been purged." + + def test_read_missing_file(self): + job, _, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + data = {"path": self.input_hda.get_file_name() + "_missing", "job_key": job_key} + get_url = self._api_url(f"jobs/{job_id}/files", use_key=True) + + head_response = requests.head(get_url, params=data) + assert head_response.status_code == 404 + + response = requests.get(get_url, params=data) + assert response.status_code == 404 + + def test_read_folder(self): + job, _, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + data = {"path": os.path.dirname(self.input_hda.get_file_name()), "job_key": job_key} + get_url = self._api_url(f"jobs/{job_id}/files", use_key=True) + + head_response = requests.head(get_url, params=data) assert head_response.status_code == 400 - assert head_response.json()["err_msg"] == "Input dataset(s) for job have been purged." + + response = requests.get(get_url, params=data) + assert response.status_code == 400 + + def test_write_no_file(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data) + assert response.status_code == 400 + + def test_propfind(self): + # remove this test when ALL Galaxy endpoints have been migrated to FastAPI; it will then be FastAPI's + # responsibility to return a status code other than 404 + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + propfind_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.request("PROPFIND", propfind_url, params=data) + assert response.status_code == 501 def test_write_by_state(self): job, output_hda, working_directory = self.create_static_job_with_state("running") @@ -269,9 +314,13 @@ def sa_session(self): def create_static_job_with_state(self, state): """Create a job with unknown handler so its state won't change.""" sa_session = self.sa_session - hda = sa_session.scalars(select(model.HistoryDatasetAssociation)).all()[0] + stmt_hda = select(model.HistoryDatasetAssociation).where( + model.HistoryDatasetAssociation.history_id == self.hist_id + ) + hda = sa_session.scalars(stmt_hda).first() assert hda - history = sa_session.scalars(select(model.History)).all()[0] + stmt_history = select(model.History).where(model.History.id == self.hist_id) + history = sa_session.scalars(stmt_history).first() assert history user = sa_session.scalars(select(model.User)).all()[0] assert user