Skip to content

Commit f929151

Browse files
authored
Merge branch 'main' into lambda-worker
2 parents 113aa18 + 6b8e441 commit f929151

55 files changed

Lines changed: 2442 additions & 260 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,14 @@ Some examples require extra dependencies. See each sample's directory for specif
7575
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
7676
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
7777
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.
78+
* [Nexus Messaging](nexus_messaging): Demonstrates how send signal, update and query messages through Nexus.
79+
This contains two samples, one sending messages to an existing workflow and a second that creates a workflow through Nexus
80+
and sends messages to it.
7881
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
7982
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
8083
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
8184
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
85+
* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental**
8286
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
8387
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
8488
* [sentry](sentry) - Report errors to Sentry.

nexus_messaging/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
This sample shows how to expose a long-running Workflow's queries, updates, and signals as Nexus
2+
operations. There are two self-contained examples, each in its own directory:
3+
4+
| | `callerpattern/` | `ondemandpattern/` |
5+
|--------------------------------|--------------------------------------|--------------------------------------------------------------|
6+
| **Pattern** | Signal an existing Workflow | Create and run Workflows on demand, and send signals to them |
7+
| **Who creates the Workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation |
8+
| **Who knows the Workflow ID?** | Only the handler | The caller chooses and passes it in every operation |
9+
| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` |
10+
11+
Each directory is fully self-contained for clarity. The `GreetingWorkflow`, activity, and
12+
`Language` enum are **identical** between the two -- only the Nexus service definition and its
13+
handler implementation differ. This highlights that the same Workflow can be exposed through
14+
Nexus in different ways depending on whether the caller needs lifecycle control.
15+
16+
See each directory's README for running instructions.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
## Caller pattern
2+
3+
The handler worker starts a `GreetingWorkflow` for a User ID.
4+
`NexusGreetingServiceHandler` holds that ID and routes every Nexus operation to it.
5+
The caller's input does not have that Workflow ID as the caller doesn't know it -- but the caller
6+
sends in the User ID, and `NexusGreetingServiceHandler` knows how to get the desired Workflow ID
7+
from that User ID (see the `get_workflow_id` call).
8+
9+
The handler worker uses the same `get_workflow_id` call to generate a Workflow ID from a Wser ID
10+
when it launches the Workflow.
11+
12+
The caller Workflow:
13+
1. Queries for supported languages (`get_languages` -- backed by a `@workflow.query`)
14+
2. Changes the language to Arabic (`set_language` -- backed by a `@workflow.update` that calls an activity)
15+
3. Confirms the change via a second query (`get_language`)
16+
4. Approves the Workflow (`approve` -- backed by a `@workflow.signal`)
17+
18+
### Running
19+
20+
Start a Temporal server:
21+
22+
```bash
23+
temporal server start-dev
24+
```
25+
26+
Create the namespaces and Nexus endpoint:
27+
28+
```bash
29+
temporal operator namespace create --namespace nexus-messaging-handler-namespace
30+
temporal operator namespace create --namespace nexus-messaging-caller-namespace
31+
32+
temporal operator nexus endpoint create \
33+
--name nexus-messaging-nexus-endpoint \
34+
--target-namespace nexus-messaging-handler-namespace \
35+
--target-task-queue nexus-messaging-handler-task-queue
36+
```
37+
38+
In one terminal, start the handler worker:
39+
40+
```bash
41+
uv run python -m nexus_messaging.callerpattern.handler.worker
42+
```
43+
44+
In another terminal, run the following command to start the example:
45+
46+
```bash
47+
uv run python -m nexus_messaging.callerpattern.caller.app
48+
```
49+
50+
Expected output:
51+
52+
```
53+
Supported languages: [<Language.CHINESE: 2>, <Language.ENGLISH: 3>]
54+
Language changed: ENGLISH -> ARABIC
55+
Workflow approved
56+
```
File renamed without changes.

nexus_sync_operations/caller/app.py renamed to nexus_messaging/callerpattern/caller/app.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@
66
from temporalio.envconfig import ClientConfig
77
from temporalio.worker import Worker
88

9-
from nexus_sync_operations.caller.workflows import CallerWorkflow
9+
from nexus_messaging.callerpattern.caller.workflows import CallerWorkflow
1010

11-
NAMESPACE = "nexus-sync-operations-caller-namespace"
12-
TASK_QUEUE = "nexus-sync-operations-caller-task-queue"
11+
NAMESPACE = "nexus-messaging-caller-namespace"
12+
TASK_QUEUE = "nexus-messaging-caller-task-queue"
1313

1414

15-
async def execute_caller_workflow(
16-
client: Optional[Client] = None,
17-
) -> None:
15+
async def execute_caller_workflow(client: Optional[Client] = None) -> None:
1816
if client is None:
1917
config = ClientConfig.load_client_connect_config()
2018
config.setdefault("target_host", "localhost:7233")
@@ -28,7 +26,8 @@ async def execute_caller_workflow(
2826
):
2927
log = await client.execute_workflow(
3028
CallerWorkflow.run,
31-
id=str(uuid.uuid4()),
29+
arg="user-1",
30+
id=f"nexus-messaging-caller-{uuid.uuid4()}",
3231
task_queue=TASK_QUEUE,
3332
)
3433
for line in log:
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""
2+
A caller workflow that executes Nexus operations. The caller does not have information
3+
about how these operations are implemented by the Nexus service.
4+
"""
5+
6+
from temporalio import workflow
7+
from temporalio.exceptions import ApplicationError
8+
9+
from nexus_messaging.callerpattern.service import (
10+
ApproveInput,
11+
GetLanguageInput,
12+
GetLanguagesInput,
13+
Language,
14+
NexusGreetingService,
15+
SetLanguageInput,
16+
)
17+
18+
NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"
19+
20+
21+
@workflow.defn
22+
class CallerWorkflow:
23+
@workflow.run
24+
async def run(self, user_id: str) -> list[str]:
25+
log: list[str] = []
26+
nexus_client = workflow.create_nexus_client(
27+
service=NexusGreetingService,
28+
endpoint=NEXUS_ENDPOINT,
29+
)
30+
31+
# Call a Nexus operation backed by a query against the entity workflow.
32+
# The workflow must already be running on the handler, otherwise you will
33+
# get an error saying the workflow has already terminated.
34+
languages_output = await nexus_client.execute_operation(
35+
NexusGreetingService.get_languages,
36+
GetLanguagesInput(include_unsupported=False, user_id=user_id),
37+
)
38+
log.append(f"Supported languages: {languages_output.languages}")
39+
workflow.logger.info("Supported languages: %s", languages_output.languages)
40+
41+
# Following are examples for each of the three messaging types -
42+
# update, query, then signal.
43+
44+
# Call a Nexus operation backed by an update against the entity workflow.
45+
previous_language = await nexus_client.execute_operation(
46+
NexusGreetingService.set_language,
47+
SetLanguageInput(language=Language.ARABIC, user_id=user_id),
48+
)
49+
50+
# Call a Nexus operation backed by a query to confirm the language change.
51+
current_language = await nexus_client.execute_operation(
52+
NexusGreetingService.get_language,
53+
GetLanguageInput(user_id=user_id),
54+
)
55+
if current_language != Language.ARABIC:
56+
raise ApplicationError(f"Expected language ARABIC, got {current_language}")
57+
58+
log.append(
59+
f"Language changed: {previous_language.name} -> {Language.ARABIC.name}"
60+
)
61+
workflow.logger.info(
62+
"Language changed from %s to %s", previous_language, Language.ARABIC
63+
)
64+
65+
# Call a Nexus operation backed by a signal against the entity workflow.
66+
await nexus_client.execute_operation(
67+
NexusGreetingService.approve,
68+
ApproveInput(name="caller", user_id=user_id),
69+
)
70+
log.append("Workflow approved")
71+
workflow.logger.info("Workflow approved")
72+
73+
return log

nexus_messaging/callerpattern/handler/__init__.py

Whitespace-only changes.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
from temporalio import activity
5+
6+
from nexus_messaging.callerpattern.service import Language
7+
8+
9+
@activity.defn
10+
async def call_greeting_service(language: Language) -> Optional[str]:
11+
"""Simulates a call to a remote greeting service. Returns None if unsupported."""
12+
greetings = {
13+
Language.ARABIC: "\u0645\u0631\u062d\u0628\u0627 \u0628\u0627\u0644\u0639\u0627\u0644\u0645",
14+
Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c",
15+
Language.ENGLISH: "Hello, world",
16+
Language.FRENCH: "Bonjour, monde",
17+
Language.HINDI: "\u0928\u092e\u0938\u094d\u0924\u0947 \u0926\u0941\u0928\u093f\u092f\u093e",
18+
Language.PORTUGUESE: "Ol\u00e1 mundo",
19+
Language.SPANISH: "Hola mundo",
20+
}
21+
await asyncio.sleep(0.2)
22+
return greetings.get(language)

0 commit comments

Comments
 (0)