Skip to content

Commit 344e039

Browse files
committed
Nexus prototype
uv add --editable ../sdk-python uv add --group nexus --editable ../nexus-sdk-python
1 parent 7a1dd4d commit 344e039

16 files changed

+1045
-255
lines changed

nexus/README.md

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# nexus
2+
3+
Temporal Nexus is a new feature of the Temporal platform designed to connect durable executions across team, namespace,
4+
region, and cloud boundaries. It promotes a more modular architecture for sharing a subset of your team’s capabilities
5+
via well-defined service API contracts for other teams to use, that abstract underlying Temporal primitives, like
6+
Workflows, or execute arbitrary code.
7+
8+
Learn more at [temporal.io/nexus](https://temporal.io/nexus).
9+
10+
This sample shows how to use Temporal for authoring a Nexus service and call it from a workflow.
11+
12+
### Sample directory structure
13+
14+
- [service](./service) - shared service defintion
15+
- [caller](./caller) - caller workflows, worker, and starter
16+
- [handler](./handler) - handler workflow, operations, and worker
17+
- [options](./options) - command line argument parsing utility
18+
19+
## Getting started locally
20+
21+
### Get `temporal` CLI to enable local development
22+
23+
1. Follow the instructions on the [docs
24+
site](https://learn.temporal.io/getting_started/go/dev_environment/#set-up-a-local-temporal-service-for-development-with-temporal-cli)
25+
to install Temporal CLI.
26+
27+
> NOTE: Required version is at least v1.1.0.
28+
29+
### Spin up environment
30+
31+
#### Start temporal server
32+
33+
> HTTP port is required for Nexus communications
34+
35+
```
36+
temporal server start-dev --http-port 7243 --dynamic-config-value system.enableNexus=true
37+
```
38+
39+
### Initialize environment
40+
41+
#### Create caller and target namespaces
42+
43+
```
44+
temporal operator namespace create --namespace my-target-namespace
45+
temporal operator namespace create --namespace my-caller-namespace
46+
```
47+
48+
#### Create Nexus endpoint
49+
50+
```
51+
temporal operator nexus endpoint create \
52+
--name my-nexus-endpoint-name \
53+
--target-namespace my-target-namespace \
54+
--target-task-queue my-target-task-queue \
55+
--description-file ./nexus/service/description.md
56+
```
57+
58+
### Start Nexus handler worker
59+
```
60+
uv run python nexus/handler/worker.py
61+
```
62+
63+
### Run Nexus caller application (worker + starter)
64+
```
65+
uv run python nexus/caller/app.py
66+
```
67+
68+
### Output
69+
70+
TODO

nexus/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from rich.traceback import install
2+
3+
install(show_locals=True)

nexus/caller/app.py

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import asyncio
2+
import sys
3+
from typing import Any, Type
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
7+
8+
from nexus.caller.workflows import (
9+
Echo2CallerWorkflow,
10+
Echo3CallerWorkflow,
11+
EchoCallerWorkflow,
12+
Hello2CallerWorkflow,
13+
HelloCallerWorkflow,
14+
)
15+
16+
interrupt_event = asyncio.Event()
17+
18+
19+
async def execute_workflow(workflow_cls: Type[Any], input: Any) -> None:
20+
client = await Client.connect("localhost:7233", namespace="my-caller-namespace")
21+
task_queue = "my-caller-task-queue"
22+
23+
async with Worker(
24+
client,
25+
task_queue=task_queue,
26+
workflows=[workflow_cls],
27+
workflow_runner=UnsandboxedWorkflowRunner(),
28+
):
29+
print("🟠 Caller worker started")
30+
result = await client.execute_workflow(
31+
workflow_cls.run,
32+
input,
33+
id="my-caller-workflow-id",
34+
task_queue=task_queue,
35+
)
36+
print("🟢 workflow result:", result)
37+
38+
39+
if __name__ == "__main__":
40+
if len(sys.argv) != 2:
41+
print("Usage: python -m nexus.caller.app [echo|hello]")
42+
sys.exit(1)
43+
44+
[wf_name] = sys.argv[1:]
45+
coro = {
46+
"echo": execute_workflow(EchoCallerWorkflow, "hello"),
47+
"echo2": execute_workflow(Echo2CallerWorkflow, "hello"),
48+
"echo3": execute_workflow(Echo3CallerWorkflow, "hello"),
49+
"hello": execute_workflow(HelloCallerWorkflow, "world"),
50+
"hello2": execute_workflow(Hello2CallerWorkflow, "world"),
51+
}[wf_name]
52+
53+
loop = asyncio.new_event_loop()
54+
try:
55+
loop.run_until_complete(coro)
56+
except KeyboardInterrupt:
57+
interrupt_event.set()
58+
loop.run_until_complete(loop.shutdown_asyncgens())

nexus/caller/workflows.py

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
from datetime import timedelta
2+
3+
import xray
4+
from temporalio import workflow
5+
from temporalio.workflow import NexusClient
6+
7+
from nexus.service.interface import (
8+
EchoInput,
9+
EchoOutput,
10+
HelloInput,
11+
HelloOutput,
12+
MyNexusService,
13+
)
14+
15+
16+
class CallerWorkflowBase:
17+
def __init__(self):
18+
self.nexus_client = NexusClient(
19+
MyNexusService, # or string name "my-nexus-service",
20+
"my-nexus-endpoint-name",
21+
schedule_to_close_timeout=timedelta(seconds=30),
22+
)
23+
24+
25+
@workflow.defn
26+
class EchoCallerWorkflow(CallerWorkflowBase):
27+
@xray.start_as_current_workflow_method_span()
28+
@workflow.run
29+
async def run(self, message: str) -> EchoOutput:
30+
op_output = await self.nexus_client.execute_operation(
31+
MyNexusService.echo,
32+
EchoInput(message),
33+
)
34+
return op_output
35+
36+
37+
@workflow.defn
38+
class Echo2CallerWorkflow(CallerWorkflowBase):
39+
@xray.start_as_current_workflow_method_span()
40+
@workflow.run
41+
async def run(self, message: str) -> EchoOutput:
42+
op_output = await self.nexus_client.execute_operation(
43+
MyNexusService.echo2,
44+
EchoInput(message),
45+
)
46+
return op_output
47+
48+
49+
@workflow.defn
50+
class Echo3CallerWorkflow(CallerWorkflowBase):
51+
@xray.start_as_current_workflow_method_span()
52+
@workflow.run
53+
async def run(self, message: str) -> EchoOutput:
54+
op_output = await self.nexus_client.execute_operation(
55+
MyNexusService.echo3,
56+
EchoInput(message),
57+
)
58+
return op_output
59+
60+
61+
@workflow.defn
62+
class HelloCallerWorkflow(CallerWorkflowBase):
63+
@xray.start_as_current_workflow_method_span()
64+
@workflow.run
65+
async def run(self, name: str) -> HelloOutput:
66+
# TODO: Java returns a handle immediately. The handle has a blocking method to
67+
# wait until the operation has started (i.e. initial Nexus RPC response is
68+
# available, so operation ID is available in the case of an async operation).
69+
handle = await self.nexus_client.start_operation(
70+
MyNexusService.hello,
71+
HelloInput(name),
72+
)
73+
op_output = await handle
74+
return op_output
75+
76+
77+
@workflow.defn
78+
class Hello2CallerWorkflow(CallerWorkflowBase):
79+
@xray.start_as_current_workflow_method_span()
80+
@workflow.run
81+
async def run(self, name: str) -> HelloOutput:
82+
handle = await self.nexus_client.start_operation(
83+
MyNexusService.hello2,
84+
HelloInput(name),
85+
)
86+
op_output = await handle
87+
return op_output
88+
89+
90+
@workflow.defn
91+
class Hello3CallerWorkflow(CallerWorkflowBase):
92+
@xray.start_as_current_workflow_method_span()
93+
@workflow.run
94+
async def run(self, name: str) -> HelloOutput:
95+
handle = await self.nexus_client.start_operation(
96+
MyNexusService.hello3,
97+
HelloInput(name),
98+
)
99+
op_output = await handle
100+
return op_output

nexus/clean

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
temporal-delete-all my-target-namespace
2+
temporal-delete-all my-caller-namespace
3+
4+
temporal operator namespace create --namespace my-target-namespace
5+
temporal operator namespace create --namespace my-caller-namespace
6+
7+
sleep 1
8+
9+
temporal operator nexus endpoint create \
10+
--name my-nexus-endpoint-name \
11+
--target-namespace my-target-namespace \
12+
--target-task-queue my-target-task-queue \
13+
--description-file ./nexus/service/description.md
14+

nexus/diagram.d2

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
shape: sequence_diagram
2+
3+
S: {
4+
label: "Cloud Cell\n\n(handler namespace)"
5+
}
6+
7+
NW: {
8+
label: "Nexus\nWorker\n\n(User's codebase)"
9+
}
10+
11+
NSDK: {
12+
label: "Nexus\nSDK\n\n(nexus-rpc)"
13+
}
14+
15+
NH: {
16+
label: "Nexus\nHandler"
17+
}
18+
19+
type-check: "type-check time" {
20+
NW.00: "type-check operation impls\n\nE.g. Workflow.run signature\nmust match operation I/O types"
21+
NW.01: "type-check call sites\n\nE.g.fetchResult"
22+
}
23+
24+
import-time: "import time" {
25+
NW.10: import MyService
26+
NW -> NSDK: service_decorator(\ninterface, impl)
27+
NSDK.1: validate impl\nagainst interface
28+
}
29+
30+
start-worker: "start Worker" {
31+
NW.12: "Worker(nexus_services=[MyService()])"
32+
NW.14: "worker now has Service and \nOperation instances keyed by name\n\nthe Operation instances implement\nstart/cancel/getInfo/getResult interface"
33+
}
34+
handle-request: "handle request" {
35+
S -> NW: dispatch NexusTask
36+
NW -> NH: "lookup\nService and Operation\ninstance by name"
37+
NH.1: "start()"
38+
}

nexus/handler/activities.py

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import asyncio
2+
3+
from temporalio import activity
4+
5+
from nexus.service.interface import (
6+
HelloInput,
7+
HelloOutput,
8+
)
9+
10+
11+
@activity.defn
12+
async def hello_activity(input: HelloInput) -> HelloOutput:
13+
await asyncio.sleep(1)
14+
return HelloOutput(message=f"hello {input.name}")

nexus/handler/dbclient.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
class MyDBClient:
2+
def execute(self, query: str) -> str:
3+
return "<query result>"

0 commit comments

Comments
 (0)