Skip to content

Commit bf58318

Browse files
Add First-Class Gemini SDK Integration to Contrib
# Temporal Integration for the Google Gemini SDK This adds a first-class integration that lets users call the Gemini SDK's `AsyncClient` directly from within Temporal workflows. Every API call and tool invocation becomes a durable Temporal activity — giving full crash recovery, visibility in workflow event history, and replay safety — while keeping credentials entirely on the worker side. ## How it works The integration shims three layers of the Gemini SDK so that workflows can use `client.models`, `client.files`, `client.file_search_stores`, `client.chats`, and all other SDK modules naturally: ### `TemporalApiClient` (`_temporal_api_client.py`) A `BaseApiClient` subclass that replaces the SDK's HTTP layer. Instead of making network calls, `async_request` and `async_request_streamed` serialize the request and dispatch it through `workflow.execute_activity`. The real HTTP call happens inside the activity on the worker, where the actual `genai.Client` with real credentials lives. Sync methods raise immediately. Per-request `http_options` are validated (non-serializable fields like `httpx_client` are rejected), and `timeout` is mapped to Temporal's `start_to_close_timeout`. ### `TemporalAsyncFiles` / `TemporalAsyncFileSearchStores` (`_temporal_files.py`, `_temporal_file_search_stores.py`) Subclasses of `AsyncFiles` and `AsyncFileSearchStores` that override `upload`, `download`, `register_files`, and `upload_to_file_search_store` to dispatch the entire operation as a Temporal activity. This avoids filesystem access (`os` module) and credential token refresh in the workflow sandbox. Methods like `get`, `delete`, `list` are inherited and work through the `TemporalApiClient`'s `async_request` activity. File uploads accept `str` paths (resolved on the worker), `os.PathLike`, or `io.IOBase` (bytes serialized across the activity boundary). ### `TemporalAsyncClient` (`_temporal_async_client.py`) An `AsyncClient` subclass that wires in `TemporalAsyncFiles` and `TemporalAsyncFileSearchStores`. All other SDK modules (`models`, `tunings`, `caches`, `batches`, `live`, `tokens`, `operations`) are inherited unchanged since they only use `async_request` under the hood. ### `GeminiPlugin` (`_gemini_plugin.py`) A `SimplePlugin` that registers all activities, configures the Pydantic data converter, and passes `google.genai` through the workflow sandbox. Users pass a fully configured `genai.Client` — the plugin never constructs one itself. An optional `extra_credentials` parameter supports operations like `register_files` that need separate GCS credentials. ### `activity_as_tool` (`workflow.py`) Wraps any `@activity.defn` function so it looks like a regular async callable to Gemini's automatic function calling (AFC). When the model decides to call the tool, the SDK invokes the wrapper, which dispatches through `workflow.execute_activity`. Users can also pass plain workflow methods directly as tools — these run in-workflow without an activity. ### Batched streaming `generate_content_stream` is supported via a batched approach: the `async_request_streamed` activity collects all chunks from the real streaming response and returns them as a list. The workflow-side `TemporalApiClient` yields them back as an async generator so the SDK sees the expected interface. ## Usage ```python # Worker side client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) plugin = GeminiPlugin(client) # Workflow side @workflow.defn class MyWorkflow: @workflow.run async def run(self, query: str) -> str: client = gemini_client() response = await client.models.generate_content( model="gemini-2.5-flash", contents=query, config=types.GenerateContentConfig( tools=[activity_as_tool(my_tool)], ), ) return response.text ``` ## Testing 31 integration tests covering: - Basic `generate_content` and multi-chunk streaming - AFC tool calling (single-arg, multi-arg, workflow methods, sequential multi-tool, failure propagation) - Per-request `http_options` propagation (headers, api_version, base_url) - File upload via str path and `io.BytesIO`, file download - File search store upload - Multi-turn chat via `client.chats` - `TemporalAsyncClient` wiring verification - `TemporalApiClient` error paths (sync raises, low-level upload/download raises) - `activity_as_tool` validation and signature preservation - A full end-to-end integration test that exercises all real activity implementations (generate, stream, file upload, download, store upload, RAG query, store delete) with a mocked `genai.Client` — ensuring the actual activity code in `_gemini_activity.py` is covered, not just the workflow-side shims.
1 parent 0522490 commit bf58318

14 files changed

Lines changed: 4148 additions & 1706 deletions

File tree

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ aioboto3 = [
4343
"aioboto3>=10.4.0",
4444
"types-aioboto3[s3]>=10.4.0",
4545
]
46+
google-gemini = [
47+
"google-genai>=1.66.0",
48+
]
4649

4750
[project.urls]
4851
Homepage = "https://github.com/temporalio/sdk-python"
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""First-class Temporal integration for the Google Gemini SDK.
2+
3+
.. warning::
4+
This module is experimental and may change in future versions.
5+
Use with caution in production environments.
6+
7+
This integration lets you use the Gemini SDK's async client with full
8+
automatic function calling (AFC) support, where every API call and every
9+
tool invocation is a **durable Temporal activity**.
10+
11+
No credentials are fetched in the workflow, and no auth material appears in
12+
Temporal's event history.
13+
14+
- :class:`GeminiPlugin` — registers the ``gemini_api_client_async_request``
15+
activity using a caller-provided ``genai.Client`` on the worker side.
16+
- :func:`gemini_client` — call from a workflow to get an ``AsyncClient``
17+
that routes API calls through activities.
18+
- :func:`activity_as_tool` — convert any ``@activity.defn`` function into a
19+
Gemini tool callable; Gemini's AFC invokes it as a Temporal activity.
20+
21+
Quickstart::
22+
23+
# ---- worker setup (outside sandbox) ----
24+
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
25+
plugin = GeminiPlugin(client)
26+
27+
@activity.defn
28+
async def get_weather(state: str) -> str: ...
29+
30+
# ---- workflow (sandbox-safe) ----
31+
@workflow.defn
32+
class AgentWorkflow:
33+
@workflow.run
34+
async def run(self, query: str) -> str:
35+
client = gemini_client()
36+
response = await client.models.generate_content(
37+
model="gemini-2.5-flash",
38+
contents=query,
39+
config=types.GenerateContentConfig(
40+
tools=[
41+
activity_as_tool(
42+
get_weather,
43+
start_to_close_timeout=timedelta(seconds=30),
44+
),
45+
],
46+
),
47+
)
48+
return response.text
49+
"""
50+
51+
from __future__ import annotations
52+
53+
from temporalio.contrib.google_gemini_sdk._gemini_plugin import GeminiPlugin
54+
from temporalio.contrib.google_gemini_sdk.workflow import (
55+
activity_as_tool,
56+
gemini_client,
57+
)
58+
59+
__all__ = [
60+
"GeminiPlugin",
61+
"activity_as_tool",
62+
"gemini_client",
63+
]
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
"""Temporal activity that executes Gemini SDK API calls with real credentials.
2+
3+
The ``TemporalApiClient`` in the workflow dispatches calls here. This
4+
activity holds a user-provided ``genai.Client`` and forwards structured
5+
requests. Credentials are fetched/refreshed only within the activity —
6+
they never appear in workflow event history.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from collections.abc import Sequence
12+
from typing import Any, Callable
13+
14+
import google.auth.credentials
15+
from google.genai import Client as GeminiClient
16+
from google.genai import types
17+
from google.genai.types import HttpOptions
18+
from google.genai.types import HttpResponse as SdkHttpResponse
19+
20+
from temporalio import activity
21+
from temporalio.contrib.google_gemini_sdk._models import (
22+
_GeminiApiRequest,
23+
_GeminiApiResponse,
24+
_GeminiApiStreamedResponse,
25+
_GeminiDownloadFileRequest,
26+
_GeminiRegisterFilesRequest,
27+
_GeminiUploadFileRequest,
28+
_GeminiUploadToFileSearchStoreRequest,
29+
)
30+
31+
32+
def _resolve_http_options(
33+
overrides: Any,
34+
) -> HttpOptions | None:
35+
"""Reconstruct ``HttpOptions`` from serializable overrides, or None."""
36+
if overrides is None:
37+
return None
38+
return HttpOptions.model_validate(overrides.model_dump(exclude_none=True))
39+
40+
41+
class GeminiApiCaller:
42+
"""Wraps a ``genai.Client`` and exposes Temporal activities for SDK calls.
43+
44+
The caller owns a reference to the user-provided ``genai.Client``.
45+
All credential management, HTTP client configuration, etc. is the
46+
responsibility of whoever constructs the client.
47+
"""
48+
49+
def __init__(
50+
self,
51+
client: GeminiClient,
52+
credentials: google.auth.credentials.Credentials | None = None,
53+
) -> None:
54+
"""Initialize with a genai.Client and optional extra credentials."""
55+
self._client = client
56+
self._credentials = credentials
57+
58+
def activities(self) -> Sequence[Callable]:
59+
"""Return activities that route SDK calls through this client."""
60+
61+
@activity.defn(name="gemini_api_client_async_request")
62+
async def gemini_api_client_async_request(
63+
req: _GeminiApiRequest,
64+
) -> _GeminiApiResponse:
65+
"""Execute a Gemini SDK API call with real credentials."""
66+
response: SdkHttpResponse = (
67+
await self._client.aio._api_client.async_request(
68+
http_method=req.http_method,
69+
path=req.path,
70+
request_dict=req.request_dict,
71+
http_options=_resolve_http_options(req.http_options_overrides),
72+
)
73+
)
74+
return _GeminiApiResponse(
75+
headers=response.headers or {},
76+
body=response.body or "",
77+
)
78+
79+
@activity.defn(name="gemini_api_client_async_request_streamed")
80+
async def gemini_api_client_async_request_streamed(
81+
req: _GeminiApiRequest,
82+
) -> _GeminiApiStreamedResponse:
83+
"""Execute a streamed Gemini SDK API call, collecting all chunks."""
84+
stream = await self._client.aio._api_client.async_request_streamed(
85+
http_method=req.http_method,
86+
path=req.path,
87+
request_dict=req.request_dict,
88+
http_options=_resolve_http_options(req.http_options_overrides),
89+
)
90+
chunks = []
91+
async for chunk in stream:
92+
chunks.append(
93+
_GeminiApiResponse(
94+
headers=chunk.headers or {},
95+
body=chunk.body or "",
96+
)
97+
)
98+
return _GeminiApiStreamedResponse(chunks=chunks)
99+
100+
@activity.defn(name="gemini_files_upload")
101+
async def gemini_files_upload(
102+
req: _GeminiUploadFileRequest,
103+
) -> types.File:
104+
"""Upload a file using the real genai.Client on the worker."""
105+
if req.file_bytes is not None:
106+
import io
107+
108+
file_arg: Any = io.BytesIO(req.file_bytes)
109+
else:
110+
file_arg = req.file_path
111+
112+
return await self._client.aio.files.upload(file=file_arg, config=req.config)
113+
114+
@activity.defn(name="gemini_files_download")
115+
async def gemini_files_download(
116+
req: _GeminiDownloadFileRequest,
117+
) -> bytes:
118+
"""Download a file using the real genai.Client on the worker."""
119+
return await self._client.aio.files.download(
120+
file=req.file, config=req.config
121+
)
122+
123+
@activity.defn(name="gemini_files_register")
124+
async def gemini_files_register(
125+
req: _GeminiRegisterFilesRequest,
126+
) -> types.RegisterFilesResponse:
127+
"""Register GCS files using the real genai.Client on the worker.
128+
129+
Uses ``credentials`` if provided at plugin init,
130+
otherwise falls back to the client's own credentials.
131+
Token refresh happens here on the worker side, so no auth
132+
material enters the workflow event history.
133+
"""
134+
auth = self._credentials or self._client._api_client._credentials
135+
if auth is None:
136+
raise ValueError(
137+
"No credentials available for register_files(). "
138+
"Pass extra_credentials to GeminiPlugin or initialize "
139+
"the genai.Client with credentials."
140+
)
141+
return await self._client.aio.files.register_files(
142+
auth=auth,
143+
uris=req.uris,
144+
config=req.config,
145+
)
146+
147+
@activity.defn(name="gemini_file_search_stores_upload")
148+
async def gemini_file_search_stores_upload(
149+
req: _GeminiUploadToFileSearchStoreRequest,
150+
) -> types.UploadToFileSearchStoreOperation:
151+
"""Upload a file to a file search store on the worker."""
152+
if req.file_bytes is not None:
153+
import io
154+
155+
file_arg: Any = io.BytesIO(req.file_bytes)
156+
else:
157+
file_arg = req.file_path
158+
159+
return (
160+
await self._client.aio.file_search_stores.upload_to_file_search_store(
161+
file_search_store_name=req.file_search_store_name,
162+
file=file_arg,
163+
config=req.config,
164+
)
165+
)
166+
167+
return [
168+
gemini_api_client_async_request,
169+
gemini_api_client_async_request_streamed,
170+
gemini_files_upload,
171+
gemini_files_download,
172+
gemini_files_register,
173+
gemini_file_search_stores_upload,
174+
]
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""Temporal plugin for Google Gemini SDK integration."""
2+
3+
from __future__ import annotations
4+
5+
import dataclasses
6+
7+
import google.auth.credentials
8+
from google.genai import Client as GeminiClient
9+
10+
from temporalio.contrib.google_gemini_sdk._gemini_activity import GeminiApiCaller
11+
from temporalio.contrib.pydantic import PydanticPayloadConverter
12+
from temporalio.converter import DataConverter, DefaultPayloadConverter
13+
from temporalio.plugin import SimplePlugin
14+
from temporalio.worker import WorkflowRunner
15+
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner
16+
17+
18+
def _data_converter(converter: DataConverter | None) -> DataConverter:
19+
if converter is None:
20+
return DataConverter(payload_converter_class=PydanticPayloadConverter)
21+
elif converter.payload_converter_class is DefaultPayloadConverter:
22+
return dataclasses.replace(
23+
converter, payload_converter_class=PydanticPayloadConverter
24+
)
25+
return converter
26+
27+
28+
class GeminiPlugin(SimplePlugin):
29+
"""A Temporal Worker Plugin configured for the Google Gemini SDK.
30+
31+
.. warning::
32+
This class is experimental and may change in future versions.
33+
Use with caution in production environments.
34+
35+
This plugin registers the ``gemini_api_client_async_request`` activity
36+
using the provided ``genai.Client`` with real credentials. Workflows use
37+
:func:`~temporalio.contrib.google_gemini_sdk.workflow.gemini_client` to
38+
get an ``AsyncClient`` backed by a ``TemporalApiClient`` that routes all
39+
API calls through this activity.
40+
41+
No credentials are passed to or from the workflow. Auth material never
42+
appears in Temporal's event history.
43+
44+
Example (Gemini Developer API)::
45+
46+
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
47+
plugin = GeminiPlugin(client)
48+
49+
Example (Vertex AI)::
50+
51+
client = genai.Client(
52+
vertexai=True, project="my-project", location="us-central1",
53+
)
54+
plugin = GeminiPlugin(client)
55+
56+
Example (with separate GCS credentials for file registration)::
57+
58+
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
59+
gcs_creds, _ = google.auth.default()
60+
plugin = GeminiPlugin(client, extra_credentials=gcs_creds)
61+
"""
62+
63+
def __init__(
64+
self,
65+
client: GeminiClient,
66+
extra_credentials: google.auth.credentials.Credentials | None = None,
67+
) -> None:
68+
"""Initialize the Gemini plugin.
69+
70+
Args:
71+
client: A fully configured ``genai.Client`` instance.
72+
All credential management, HTTP client configuration, etc.
73+
is the responsibility of the caller.
74+
extra_credentials: Optional Google Cloud credentials used for
75+
operations that require explicit auth (e.g.
76+
``files.register_files()``). If not provided, the
77+
client's own credentials are used.
78+
"""
79+
self._api_caller = GeminiApiCaller(client, credentials=extra_credentials)
80+
81+
def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner:
82+
if not runner:
83+
raise ValueError("No WorkflowRunner provided to GeminiPlugin.")
84+
if isinstance(runner, SandboxedWorkflowRunner):
85+
return dataclasses.replace(
86+
runner,
87+
restrictions=runner.restrictions.with_passthrough_modules(
88+
"google.genai"
89+
),
90+
)
91+
return runner
92+
93+
super().__init__(
94+
name="GeminiPlugin",
95+
data_converter=_data_converter,
96+
activities=self._api_caller.activities(),
97+
workflow_runner=workflow_runner,
98+
)

0 commit comments

Comments
 (0)