Skip to content

Commit 0448a6b

Browse files
committed
Nexus prototype
uv add --editable ../sdk-python uv add --group nexus --editable ../nexus-sdk-python
1 parent 7a1dd4d commit 0448a6b

14 files changed

+1329
-358
lines changed

Diff for: hello_nexus/README.md

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# nexus
2+
3+
Temporal Nexus is a new feature of the Temporal platform designed to connect durable executions across team, namespace,
4+
region, and cloud boundaries. It promotes a more modular architecture for sharing a subset of your team’s capabilities
5+
via well-defined service API contracts for other teams to use, that abstract underlying Temporal primitives, like
6+
Workflows, or execute arbitrary code.
7+
8+
Learn more at [temporal.io/nexus](https://temporal.io/nexus).
9+
10+
This sample shows how to use Temporal for authoring a Nexus service and call it from a workflow.
11+
12+
### Sample directory structure
13+
14+
- [service](./service) - shared service defintion
15+
- [caller](./caller) - caller workflows, worker, and starter
16+
- [handler](./handler) - handler workflow, operations, and worker
17+
- [options](./options) - command line argument parsing utility
18+
19+
## Getting started locally
20+
21+
### Get `temporal` CLI to enable local development
22+
23+
1. Follow the instructions on the [docs
24+
site](https://learn.temporal.io/getting_started/go/dev_environment/#set-up-a-local-temporal-service-for-development-with-temporal-cli)
25+
to install Temporal CLI.
26+
27+
> NOTE: Required version is at least v1.1.0.
28+
29+
### Spin up environment
30+
31+
#### Start temporal server
32+
33+
> HTTP port is required for Nexus communications
34+
35+
```
36+
temporal server start-dev --http-port 7243 --dynamic-config-value system.enableNexus=true
37+
```
38+
39+
### Initialize environment
40+
41+
#### Create caller and target namespaces
42+
43+
```
44+
temporal operator namespace create --namespace my-target-namespace
45+
temporal operator namespace create --namespace my-caller-namespace
46+
```
47+
48+
#### Create Nexus endpoint
49+
50+
```
51+
temporal operator nexus endpoint create \
52+
--name my-nexus-endpoint-name \
53+
--target-namespace my-target-namespace \
54+
--target-task-queue my-target-task-queue \
55+
--description-file ./nexus/service/description.md
56+
```
57+
58+
### Start Nexus handler worker
59+
```
60+
uv run python nexus/handler/worker.py
61+
```
62+
63+
### Run Nexus caller application (worker + starter)
64+
```
65+
uv run python nexus/caller/app.py
66+
```
67+
68+
### Output
69+
70+
TODO

Diff for: hello_nexus/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from rich.traceback import install
2+
3+
install(show_locals=True)

Diff for: hello_nexus/caller/app.py

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import asyncio
2+
import sys
3+
from typing import Any, Type
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
7+
8+
from hello_nexus.caller.workflows import (
9+
Echo2CallerWorkflow,
10+
Echo3CallerWorkflow,
11+
EchoCallerWorkflow,
12+
Hello2CallerWorkflow,
13+
HelloCallerWorkflow,
14+
)
15+
16+
interrupt_event = asyncio.Event()
17+
18+
19+
async def execute_workflow(workflow_cls: Type[Any], input: Any) -> None:
20+
client = await Client.connect("localhost:7233", namespace="my-caller-namespace")
21+
task_queue = "my-caller-task-queue"
22+
23+
async with Worker(
24+
client,
25+
task_queue=task_queue,
26+
workflows=[workflow_cls],
27+
workflow_runner=UnsandboxedWorkflowRunner(),
28+
):
29+
print("🟠 Caller worker started")
30+
result = await client.execute_workflow(
31+
workflow_cls.run,
32+
input,
33+
id="my-caller-workflow-id",
34+
task_queue=task_queue,
35+
)
36+
print("🟢 workflow result:", result)
37+
38+
39+
if __name__ == "__main__":
40+
if len(sys.argv) != 2:
41+
print("Usage: python -m nexus.caller.app [echo|hello]")
42+
sys.exit(1)
43+
44+
[wf_name] = sys.argv[1:]
45+
fn = {
46+
"echo": lambda: execute_workflow(EchoCallerWorkflow, "hello"),
47+
"echo2": lambda: execute_workflow(Echo2CallerWorkflow, "hello"),
48+
"echo3": lambda: execute_workflow(Echo3CallerWorkflow, "hello"),
49+
"hello": lambda: execute_workflow(HelloCallerWorkflow, "world"),
50+
"hello2": lambda: execute_workflow(Hello2CallerWorkflow, "world"),
51+
}[wf_name]
52+
53+
loop = asyncio.new_event_loop()
54+
try:
55+
loop.run_until_complete(fn())
56+
except KeyboardInterrupt:
57+
interrupt_event.set()
58+
loop.run_until_complete(loop.shutdown_asyncgens())

Diff for: hello_nexus/caller/workflows.py

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from datetime import timedelta
2+
3+
from temporalio import workflow
4+
from temporalio.exceptions import FailureError
5+
from temporalio.workflow import NexusClient
6+
7+
from hello_nexus.service.interface import (
8+
EchoInput,
9+
EchoOutput,
10+
HelloInput,
11+
HelloOutput,
12+
MyNexusService,
13+
)
14+
15+
16+
class CallerWorkflowBase:
17+
def __init__(self):
18+
self.nexus_client = NexusClient(
19+
MyNexusService, # or string name "my-nexus-service",
20+
"my-nexus-endpoint-name",
21+
schedule_to_close_timeout=timedelta(seconds=30),
22+
)
23+
24+
25+
@workflow.defn
26+
class EchoCallerWorkflow(CallerWorkflowBase):
27+
@workflow.run
28+
async def run(self, message: str) -> EchoOutput:
29+
op_output = await self.nexus_client.execute_operation(
30+
MyNexusService.echo,
31+
EchoInput(message),
32+
)
33+
return op_output
34+
35+
36+
@workflow.defn
37+
class Echo2CallerWorkflow(CallerWorkflowBase):
38+
@workflow.run
39+
async def run(self, message: str) -> EchoOutput:
40+
op_output = await self.nexus_client.execute_operation(
41+
MyNexusService.echo2,
42+
EchoInput(message),
43+
)
44+
return op_output
45+
46+
47+
@workflow.defn
48+
class Echo3CallerWorkflow(CallerWorkflowBase):
49+
@workflow.run
50+
async def run(self, message: str) -> EchoOutput:
51+
op_output = await self.nexus_client.execute_operation(
52+
MyNexusService.echo3,
53+
EchoInput(message),
54+
)
55+
return op_output
56+
57+
58+
@workflow.defn
59+
class HelloCallerWorkflow(CallerWorkflowBase):
60+
@workflow.run
61+
async def run(self, name: str) -> HelloOutput:
62+
# TODO: Java returns a handle immediately. The handle has a blocking method to
63+
# wait until the operation has started (i.e. initial Nexus RPC response is
64+
# available, so operation ID is available in the case of an async operation).
65+
handle = await self.nexus_client.start_operation(
66+
MyNexusService.hello,
67+
HelloInput(name),
68+
)
69+
assert handle.cancel()
70+
try:
71+
await handle
72+
# TODO(dan): But should this be asyncio.CancelledError? (there's also
73+
# temporalio.exceptions.CancelledError which doesn't inherit from
74+
# asyncio.CancelledError; as a workflow author, this confused me)
75+
except FailureError:
76+
handle = await self.nexus_client.start_operation(
77+
MyNexusService.hello,
78+
HelloInput(name),
79+
)
80+
result = await handle
81+
return result
82+
raise AssertionError("Expected Nexus operation to be cancelled")
83+
84+
85+
@workflow.defn
86+
class Hello2CallerWorkflow(CallerWorkflowBase):
87+
@workflow.run
88+
async def run(self, name: str) -> HelloOutput:
89+
handle = await self.nexus_client.start_operation(
90+
MyNexusService.hello2,
91+
HelloInput(name),
92+
)
93+
return await handle
94+
95+
96+
@workflow.defn
97+
class Hello3CallerWorkflow(CallerWorkflowBase):
98+
@workflow.run
99+
async def run(self, name: str) -> HelloOutput:
100+
handle = await self.nexus_client.start_operation(
101+
MyNexusService.hello3,
102+
HelloInput(name),
103+
)
104+
return await handle

Diff for: hello_nexus/clean

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
temporal-delete-all my-target-namespace
2+
temporal-delete-all my-caller-namespace
3+
4+
temporal operator namespace create --namespace my-target-namespace
5+
temporal operator namespace create --namespace my-caller-namespace
6+
7+
sleep 1
8+
9+
temporal operator nexus endpoint create \
10+
--name my-nexus-endpoint-name \
11+
--target-namespace my-target-namespace \
12+
--target-task-queue my-target-task-queue \
13+
--description-file ./hello_nexus/service/description.md
14+

Diff for: hello_nexus/handler/activities.py

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import asyncio
2+
3+
from temporalio import activity
4+
5+
from hello_nexus.service.interface import (
6+
HelloInput,
7+
HelloOutput,
8+
)
9+
10+
11+
@activity.defn
12+
async def hello_activity(input: HelloInput) -> HelloOutput:
13+
await asyncio.sleep(1)
14+
return HelloOutput(message=f"hello {input.name}")

Diff for: hello_nexus/handler/dbclient.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
class MyDBClient:
2+
def execute(self, query: str) -> str:
3+
return "<query result>"

0 commit comments

Comments
 (0)