Skip to content

Commit 0fb8300

Browse files
Add file upload/download support via TemporalAsyncFiles and TemporalAsyncClient
File operations (upload, download) now run entirely inside Temporal activities using the real genai.Client on the worker side, avoiding filesystem and OS module access in the workflow sandbox. - Add TemporalAsyncFiles (_temporal_files.py): subclasses AsyncFiles, overriding upload() and download() to dispatch through activities. Supports str paths, os.PathLike, and io.IOBase inputs. Validates http_options for non-serializable fields before crossing the activity boundary. - Add TemporalAsyncClient (_temporal_async_client.py): subclasses AsyncClient, wiring in TemporalAsyncFiles. gemini_client() now returns this instead of plain AsyncClient. - Add gemini_files_upload and gemini_files_download activities. - Include HTTP method in activity summaries (e.g. "POST files").
1 parent a3fae4c commit 0fb8300

5 files changed

Lines changed: 251 additions & 28 deletions

File tree

temporalio/contrib/google_gemini_sdk/_gemini_activity.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from collections.abc import Sequence
1212
from typing import Any, Callable
1313

14-
from google.genai import Client as GeminiClient
14+
from google.genai import Client as GeminiClient, types
1515
from google.genai.types import HttpOptions
1616
from google.genai.types import HttpResponse as SdkHttpResponse
1717

@@ -20,6 +20,8 @@
2020
_GeminiApiRequest,
2121
_GeminiApiResponse,
2222
_GeminiApiStreamedResponse,
23+
_GeminiDownloadFileRequest,
24+
_GeminiUploadFileRequest,
2325
)
2426

2527

@@ -85,7 +87,34 @@ async def gemini_api_client_async_request_streamed(
8587
)
8688
return _GeminiApiStreamedResponse(chunks=chunks)
8789

90+
@activity.defn(name="gemini_files_upload")
91+
async def gemini_files_upload(
92+
req: _GeminiUploadFileRequest,
93+
) -> types.File:
94+
"""Upload a file using the real genai.Client on the worker."""
95+
if req.file_bytes is not None:
96+
import io
97+
98+
file_arg: Any = io.BytesIO(req.file_bytes)
99+
else:
100+
file_arg = req.file_path
101+
102+
return await self._client.aio.files.upload(
103+
file=file_arg, config=req.config
104+
)
105+
106+
@activity.defn(name="gemini_files_download")
107+
async def gemini_files_download(
108+
req: _GeminiDownloadFileRequest,
109+
) -> bytes:
110+
"""Download a file using the real genai.Client on the worker."""
111+
return await self._client.aio.files.download(
112+
file=req.file, config=req.config
113+
)
114+
88115
return [
89116
gemini_api_client_async_request,
90117
gemini_api_client_async_request_streamed,
118+
gemini_files_upload,
119+
gemini_files_download,
91120
]

temporalio/contrib/google_gemini_sdk/_temporal_api_client.py

Lines changed: 56 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
"""Temporal-aware BaseApiClient that routes SDK calls through activities.
22
3-
This module provides ``TemporalApiClient``, a minimal ``BaseApiClient``
4-
subclass whose ``async_request()`` dispatches through a Temporal activity
5-
instead of making direct HTTP calls. The real ``genai.Client`` with real
6-
credentials only exists on the worker side inside the activity.
3+
This module provides ``TemporalApiClient``, a ``BaseApiClient`` subclass
4+
whose HTTP methods dispatch through Temporal activities instead of making
5+
direct calls. The real ``genai.Client`` with real credentials only exists
6+
on the worker side inside the activity.
77
88
This ensures:
99
- No credential fetching or refreshing happens in the workflow.
@@ -17,6 +17,7 @@
1717
from datetime import timedelta
1818
from typing import Any, Optional
1919

20+
from google.genai import types
2021
from google.genai._api_client import BaseApiClient
2122
from google.genai.types import HttpOptions, HttpOptionsOrDict
2223
from google.genai.types import HttpResponse as SdkHttpResponse
@@ -55,6 +56,22 @@ class _SerializableHttpOptions(BaseModel):
5556
)
5657

5758

59+
def _validate_http_options(http_options: Optional[HttpOptions]) -> None:
60+
"""Raise if http_options contains non-serializable fields."""
61+
if http_options is None:
62+
return
63+
bad_fields = [
64+
f
65+
for f in _REJECTED_HTTP_OPTION_FIELDS
66+
if getattr(http_options, f, None) is not None
67+
]
68+
if bad_fields:
69+
raise ValueError(
70+
f"http_options cannot include {bad_fields}. "
71+
f"Configure custom HTTP clients at GeminiPlugin init instead."
72+
)
73+
74+
5875
# ── async_request models ──────────────────────────────────────────────────
5976

6077
class _GeminiApiRequest(BaseModel):
@@ -83,6 +100,28 @@ class _GeminiApiStreamedResponse(BaseModel):
83100
chunks: list[_GeminiApiResponse]
84101

85102

103+
# ── files upload/download models ──────────────────────────────────────────
104+
105+
106+
class _GeminiUploadFileRequest(BaseModel):
107+
"""Serializable activity input for a file upload.
108+
109+
For file path uploads the path is resolved on the worker. For
110+
in-memory uploads the raw bytes are sent across the activity boundary.
111+
"""
112+
113+
file_bytes: bytes | None = None
114+
file_path: str | None = None
115+
config: types.UploadFileConfig | None = None
116+
117+
118+
class _GeminiDownloadFileRequest(BaseModel):
119+
"""Serializable activity input for a file download."""
120+
121+
file: str
122+
config: types.DownloadFileConfig | None = None
123+
124+
86125
class TemporalApiClient(BaseApiClient):
87126
"""A ``BaseApiClient`` that routes all API calls through Temporal activities.
88127
@@ -157,16 +196,7 @@ def _process_http_options(
157196
else:
158197
opts = HttpOptions.model_validate(http_options)
159198

160-
bad_fields = [
161-
f
162-
for f in _REJECTED_HTTP_OPTION_FIELDS
163-
if getattr(opts, f, None) is not None
164-
]
165-
if bad_fields:
166-
raise ValueError(
167-
f"Per-request http_options cannot include {bad_fields}. "
168-
f"Configure custom HTTP clients at GeminiPlugin init instead."
169-
)
199+
_validate_http_options(opts)
170200

171201
# timeout is owned by Temporal — apply it to the activity config
172202
# rather than forwarding to the underlying HTTP client.
@@ -201,7 +231,7 @@ async def async_request(
201231
config: ActivityConfig = {**self._activity_config}
202232
if "summary" not in config:
203233
# Default summary is the API path (e.g. "models/gemini-2.5-flash:generateContent").
204-
config["summary"] = path
234+
config["summary"] = f"{http_method.upper()} {path}"
205235
overrides = self._process_http_options(http_options, config)
206236

207237
resp = await temporal_workflow.execute_activity(
@@ -252,7 +282,7 @@ async def async_request_streamed(
252282
) -> Any:
253283
config: ActivityConfig = {**self._activity_config}
254284
if "summary" not in config:
255-
config["summary"] = path
285+
config["summary"] = f"{http_method.upper()} {path}"
256286
overrides = self._process_http_options(http_options, config)
257287

258288
resp = await temporal_workflow.execute_activity(
@@ -273,29 +303,29 @@ async def _yield_chunks():
273303

274304
return _yield_chunks()
275305

276-
# ── File upload/download (not supported) ─────────────────────────────
277-
# These methods are only used by the SDK's file_search_stores module,
278-
# which is not workflow-safe because it uses the OS module for file
279-
# I/O. Until we explicitly add support for the file_search_stores module
280-
# operations as activities, raise early so callers get a clear error rather
281-
# than a crash from missing HTTP clients.
306+
# ── File upload/download ─────────────────────────────────────────────
307+
# File operations are handled at a higher level by TemporalAsyncFiles
308+
# (in _temporal_files.py), which dispatches the entire upload/download
309+
# as a Temporal activity using the real client on the worker side.
310+
# These internal BaseApiClient methods are not called in that path,
311+
# so we raise here to catch any unexpected direct usage.
282312

283313
def upload_file(self, *args: Any, **kwargs: Any) -> Any:
284314
raise NotImplementedError(
285-
"File uploads are not yet supported in the Temporal Gemini integration."
315+
"Use client.files.upload() instead of the internal upload_file() method."
286316
)
287317

288318
async def async_upload_file(self, *args: Any, **kwargs: Any) -> Any:
289319
raise NotImplementedError(
290-
"File uploads are not yet supported in the Temporal Gemini integration."
320+
"Use client.files.upload() instead of the internal async_upload_file() method."
291321
)
292322

293323
def download_file(self, *args: Any, **kwargs: Any) -> Any:
294324
raise NotImplementedError(
295-
"File downloads are not yet supported in the Temporal Gemini integration."
325+
"Use client.files.download() instead of the internal download_file() method."
296326
)
297327

298328
async def async_download_file(self, *args: Any, **kwargs: Any) -> Any:
299329
raise NotImplementedError(
300-
"File downloads are not yet supported in the Temporal Gemini integration."
330+
"Use client.files.download() instead of the internal async_download_file() method."
301331
)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""Temporal-aware AsyncClient shim.
2+
3+
``TemporalAsyncClient`` is an ``AsyncClient`` subclass that wires up
4+
``TemporalAsyncFiles`` in place of the default ``AsyncFiles``.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from google.genai.client import AsyncClient
10+
11+
from temporalio.workflow import ActivityConfig
12+
13+
from temporalio.contrib.google_gemini_sdk._temporal_api_client import (
14+
TemporalApiClient,
15+
)
16+
from temporalio.contrib.google_gemini_sdk._temporal_files import (
17+
TemporalAsyncFiles,
18+
)
19+
20+
21+
class TemporalAsyncClient(AsyncClient):
22+
"""``AsyncClient`` subclass that uses Temporal-aware modules.
23+
24+
Replaces ``AsyncFiles`` with ``TemporalAsyncFiles`` so that file
25+
upload/download operations run entirely inside Temporal activities.
26+
Other modules (models, caches, etc.) are inherited unchanged and
27+
work through ``TemporalApiClient``'s activity-backed HTTP methods.
28+
"""
29+
30+
def __init__(
31+
self,
32+
api_client: TemporalApiClient,
33+
activity_config: ActivityConfig | None = None,
34+
) -> None:
35+
super().__init__(api_client)
36+
self._files = TemporalAsyncFiles(api_client, activity_config)
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
"""Temporal-aware AsyncFiles shim.
2+
3+
``TemporalAsyncFiles`` is an ``AsyncFiles`` subclass whose ``upload``
4+
and ``download`` methods dispatch through Temporal activities so the
5+
entire file operation (including filesystem access) runs on the
6+
activity worker.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import io
12+
from datetime import timedelta
13+
from typing import Optional, Union
14+
15+
from google.genai import types
16+
from google.genai.files import AsyncFiles
17+
from google.genai.types import HttpOptions
18+
19+
from temporalio import workflow as temporal_workflow
20+
from temporalio.workflow import ActivityConfig
21+
22+
from temporalio.contrib.google_gemini_sdk._temporal_api_client import (
23+
TemporalApiClient,
24+
_GeminiDownloadFileRequest,
25+
_GeminiUploadFileRequest,
26+
_validate_http_options,
27+
)
28+
29+
30+
class TemporalAsyncFiles(AsyncFiles):
31+
"""``AsyncFiles`` subclass that routes ``upload`` and ``download`` through activities.
32+
33+
The entire file operation — including filesystem access, resumable
34+
upload negotiation, and chunked transfer — runs inside a Temporal
35+
activity on the worker. ``get``, ``delete``, and ``list`` are
36+
inherited from ``AsyncFiles`` and already work through the
37+
``TemporalApiClient``'s ``async_request`` activity.
38+
"""
39+
40+
def __init__(
41+
self,
42+
api_client: TemporalApiClient,
43+
activity_config: ActivityConfig | None = None,
44+
) -> None:
45+
super().__init__(api_client)
46+
self._activity_config = activity_config or ActivityConfig(
47+
start_to_close_timeout=timedelta(seconds=60),
48+
)
49+
50+
async def upload(
51+
self,
52+
*,
53+
file: Union[str, "os.PathLike[str]", io.IOBase],
54+
config: Optional[types.UploadFileConfigOrDict] = None,
55+
) -> types.File:
56+
"""Upload a file via a Temporal activity.
57+
58+
Accepts a file path (resolved on the worker), ``os.PathLike``, or
59+
an ``io.IOBase`` (bytes sent across the activity boundary).
60+
"""
61+
act_config: ActivityConfig = {**self._activity_config}
62+
if "summary" not in act_config:
63+
act_config["summary"] = "files.upload"
64+
65+
upload_config = None
66+
if config is not None:
67+
if isinstance(config, dict):
68+
upload_config = types.UploadFileConfig.model_validate(config)
69+
else:
70+
upload_config = config
71+
_validate_http_options(upload_config.http_options)
72+
73+
if isinstance(file, io.IOBase):
74+
req = _GeminiUploadFileRequest(
75+
file_bytes=file.read(), config=upload_config
76+
)
77+
elif isinstance(file, str):
78+
req = _GeminiUploadFileRequest(
79+
file_path=file, config=upload_config
80+
)
81+
else:
82+
# os.PathLike — convert via __fspath__() to avoid importing os
83+
req = _GeminiUploadFileRequest(
84+
file_path=file.__fspath__(), config=upload_config
85+
)
86+
87+
return await temporal_workflow.execute_activity(
88+
"gemini_files_upload",
89+
req,
90+
result_type=types.File,
91+
**act_config,
92+
)
93+
94+
async def download(
95+
self,
96+
*,
97+
file: Union[str, types.File],
98+
config: Optional[types.DownloadFileConfigOrDict] = None,
99+
) -> bytes:
100+
"""Download a file via a Temporal activity."""
101+
act_config: ActivityConfig = {**self._activity_config}
102+
if "summary" not in act_config:
103+
act_config["summary"] = "files.download"
104+
105+
download_config = None
106+
if config is not None:
107+
if isinstance(config, dict):
108+
download_config = types.DownloadFileConfig.model_validate(config)
109+
else:
110+
download_config = config
111+
_validate_http_options(download_config.http_options)
112+
113+
if isinstance(file, types.File):
114+
if not file.name:
115+
raise ValueError("File object must have a name to download.")
116+
file_name = file.name
117+
else:
118+
file_name = file
119+
120+
return await temporal_workflow.execute_activity(
121+
"gemini_files_download",
122+
_GeminiDownloadFileRequest(file=file_name, config=download_config),
123+
result_type=bytes,
124+
**act_config,
125+
)

temporalio/contrib/google_gemini_sdk/workflow.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
from temporalio.contrib.google_gemini_sdk._temporal_api_client import (
2525
TemporalApiClient,
2626
)
27+
from temporalio.contrib.google_gemini_sdk._temporal_async_client import (
28+
TemporalAsyncClient,
29+
)
2730
from temporalio.exceptions import ApplicationError
2831
from temporalio.workflow import ActivityConfig
2932

@@ -181,4 +184,4 @@ async def run(self, query: str) -> str:
181184
location=location,
182185
activity_config=activity_config,
183186
)
184-
return AsyncClient(temporal_api_client)
187+
return TemporalAsyncClient(temporal_api_client, activity_config)

0 commit comments

Comments
 (0)