From 75e77fe5f52186899370b8bc2bd1e459ceb06a30 Mon Sep 17 00:00:00 2001 From: nowgnuesLee Date: Wed, 12 Feb 2025 13:46:37 +0900 Subject: [PATCH 1/3] feature: implement api for generating JWT from storage-proxy --- src/ai/backend/storage/api/manager.py | 90 ++++++++++++++++++++++----- 1 file changed, 75 insertions(+), 15 deletions(-) diff --git a/src/ai/backend/storage/api/manager.py b/src/ai/backend/storage/api/manager.py index 6db9070e5eb..51dcc429a7e 100644 --- a/src/ai/backend/storage/api/manager.py +++ b/src/ai/backend/storage/api/manager.py @@ -19,6 +19,7 @@ Awaitable, Callable, Iterator, + Literal, NotRequired, Optional, TypedDict, @@ -150,9 +151,11 @@ def handle_external_errors() -> Iterator[None]: yield except ExternalError as e: raise web.HTTPInternalServerError( - body=json.dumps({ - "msg": str(e), - }), + body=json.dumps( + { + "msg": str(e), + } + ), content_type="application/json", ) @@ -265,10 +268,14 @@ class Params(TypedDict): quota_usage = await volume.quota_model.describe_quota_scope(params["qsid"]) if not quota_usage: raise QuotaScopeNotFoundError - return web.json_response({ - "used_bytes": quota_usage.used_bytes if quota_usage.used_bytes >= 0 else None, - "limit_bytes": quota_usage.limit_bytes if quota_usage.limit_bytes >= 0 else None, - }) + return web.json_response( + { + "used_bytes": quota_usage.used_bytes if quota_usage.used_bytes >= 0 else None, + "limit_bytes": ( + quota_usage.limit_bytes if quota_usage.limit_bytes >= 0 else None + ), + } + ) async def update_quota_scope(request: web.Request) -> web.Response: @@ -875,15 +882,19 @@ class Params(TypedDict): vfid, exc_info=result_or_exception, ) - failed_results.append({ - "msg": repr(result_or_exception), - "item": str(relpath), - }) + failed_results.append( + { + "msg": repr(result_or_exception), + "item": str(relpath), + } + ) else: - success_results.append({ - "msg": None, - "item": str(relpath), - }) + success_results.append( + { + "msg": None, + "item": str(relpath), + } + ) results: ResultSet = { "success": success_results, "failed": failed_results, @@ -1060,6 +1071,54 @@ class Params(TypedDict): ) +async def download(request: web.Request) -> web.Response: + class Params(TypedDict): + volume: str + vfid: VFolderID + relpathList: list[PurePosixPath] + zip_name: str + format: Literal["zip"] + unmanaged_path: str | None + + async with cast( + AsyncContextManager[Params], + check_params( + request, + t.Dict( + { + t.Key("volume"): t.String(), + t.Key("vfid"): tx.VFolderID(), + t.Key("relpathList"): t.List(tx.PurePath(relative_only=True)), + t.Key("zip_name"): t.String(), + t.Key("format"): t.Enum("zip"), + t.Key("unmanaged_path", default=None): t.Null | t.String, + }, + ), + ), + ) as params: + await log_manager_api_entry(log, "download", params) + ctx: RootContext = request.app["ctx"] + token_data = { + "op": "download", + "volume": params["volume"], + "vfid": str(params["vfid"]), + "relpathList": [str(relpath) for relpath in params["relpathList"]], + "zip_name": params["zip_name"], + "format": params["format"], + "exp": datetime.utcnow() + ctx.local_config["storage-proxy"]["session-expire"], + } + token = jwt.encode( + token_data, + ctx.local_config["storage-proxy"]["secret"], + algorithm="HS256", + ) + return web.json_response( + { + "token": token, + } + ) + + async def create_upload_session(request: web.Request) -> web.Response: class Params(TypedDict): volume: str @@ -1195,6 +1254,7 @@ async def init_manager_app(ctx: RootContext) -> web.Application: app.router.add_route("POST", "/folder/file/move", move_file) app.router.add_route("POST", "/folder/file/fetch", fetch_file) app.router.add_route("POST", "/folder/file/download", create_download_session) + app.router.add_route("POST", "/v2/folder/file/download", download) app.router.add_route("POST", "/folder/file/upload", create_upload_session) app.router.add_route("POST", "/folder/file/delete", delete_files) From 0b530902a092aa104a270bf0ecbed35366cc9e22 Mon Sep 17 00:00:00 2001 From: nowgnuesLee Date: Wed, 12 Feb 2025 14:09:09 +0900 Subject: [PATCH 2/3] fix: apply ruff formating --- src/ai/backend/storage/api/manager.py | 48 ++++++++++----------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/src/ai/backend/storage/api/manager.py b/src/ai/backend/storage/api/manager.py index 51dcc429a7e..7d2b26cc248 100644 --- a/src/ai/backend/storage/api/manager.py +++ b/src/ai/backend/storage/api/manager.py @@ -151,11 +151,9 @@ def handle_external_errors() -> Iterator[None]: yield except ExternalError as e: raise web.HTTPInternalServerError( - body=json.dumps( - { - "msg": str(e), - } - ), + body=json.dumps({ + "msg": str(e), + }), content_type="application/json", ) @@ -268,14 +266,10 @@ class Params(TypedDict): quota_usage = await volume.quota_model.describe_quota_scope(params["qsid"]) if not quota_usage: raise QuotaScopeNotFoundError - return web.json_response( - { - "used_bytes": quota_usage.used_bytes if quota_usage.used_bytes >= 0 else None, - "limit_bytes": ( - quota_usage.limit_bytes if quota_usage.limit_bytes >= 0 else None - ), - } - ) + return web.json_response({ + "used_bytes": quota_usage.used_bytes if quota_usage.used_bytes >= 0 else None, + "limit_bytes": (quota_usage.limit_bytes if quota_usage.limit_bytes >= 0 else None), + }) async def update_quota_scope(request: web.Request) -> web.Response: @@ -882,19 +876,15 @@ class Params(TypedDict): vfid, exc_info=result_or_exception, ) - failed_results.append( - { - "msg": repr(result_or_exception), - "item": str(relpath), - } - ) + failed_results.append({ + "msg": repr(result_or_exception), + "item": str(relpath), + }) else: - success_results.append( - { - "msg": None, - "item": str(relpath), - } - ) + success_results.append({ + "msg": None, + "item": str(relpath), + }) results: ResultSet = { "success": success_results, "failed": failed_results, @@ -1112,11 +1102,9 @@ class Params(TypedDict): ctx.local_config["storage-proxy"]["secret"], algorithm="HS256", ) - return web.json_response( - { - "token": token, - } - ) + return web.json_response({ + "token": token, + }) async def create_upload_session(request: web.Request) -> web.Response: From c963f49216cd0ca5a4f0cec2c0a59a835ed6459e Mon Sep 17 00:00:00 2001 From: nowgnuesLee Date: Wed, 19 Feb 2025 18:25:25 +0900 Subject: [PATCH 3/3] fix: fix parameter names and api address --- src/ai/backend/storage/api/manager.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ai/backend/storage/api/manager.py b/src/ai/backend/storage/api/manager.py index 7d2b26cc248..8430b776ac0 100644 --- a/src/ai/backend/storage/api/manager.py +++ b/src/ai/backend/storage/api/manager.py @@ -1066,7 +1066,7 @@ class Params(TypedDict): volume: str vfid: VFolderID relpathList: list[PurePosixPath] - zip_name: str + filename: str format: Literal["zip"] unmanaged_path: str | None @@ -1079,7 +1079,7 @@ class Params(TypedDict): t.Key("volume"): t.String(), t.Key("vfid"): tx.VFolderID(), t.Key("relpathList"): t.List(tx.PurePath(relative_only=True)), - t.Key("zip_name"): t.String(), + t.Key("filename"): t.String(), t.Key("format"): t.Enum("zip"), t.Key("unmanaged_path", default=None): t.Null | t.String, }, @@ -1093,7 +1093,7 @@ class Params(TypedDict): "volume": params["volume"], "vfid": str(params["vfid"]), "relpathList": [str(relpath) for relpath in params["relpathList"]], - "zip_name": params["zip_name"], + "filename": params["filename"], "format": params["format"], "exp": datetime.utcnow() + ctx.local_config["storage-proxy"]["session-expire"], } @@ -1242,7 +1242,7 @@ async def init_manager_app(ctx: RootContext) -> web.Application: app.router.add_route("POST", "/folder/file/move", move_file) app.router.add_route("POST", "/folder/file/fetch", fetch_file) app.router.add_route("POST", "/folder/file/download", create_download_session) - app.router.add_route("POST", "/v2/folder/file/download", download) + app.router.add_route("POST", "/folder/file/download-multi", download) app.router.add_route("POST", "/folder/file/upload", create_upload_session) app.router.add_route("POST", "/folder/file/delete", delete_files)