Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Nexus sample #174

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions nexus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# nexus

Temporal Nexus is a new feature of the Temporal platform designed to connect durable executions across team, namespace,
region, and cloud boundaries. It promotes a more modular architecture for sharing a subset of your team’s capabilities
via well-defined service API contracts for other teams to use, that abstract underlying Temporal primitives, like
Workflows, or execute arbitrary code.

Learn more at [temporal.io/nexus](https://temporal.io/nexus).

This sample shows how to use Temporal for authoring a Nexus service and call it from a workflow.

### Sample directory structure

- [service](./service) - shared service defintion
- [caller](./caller) - caller workflows, worker, and starter
- [handler](./handler) - handler workflow, operations, and worker
- [options](./options) - command line argument parsing utility

## Getting started locally

### Get `temporal` CLI to enable local development

1. Follow the instructions on the [docs
site](https://learn.temporal.io/getting_started/go/dev_environment/#set-up-a-local-temporal-service-for-development-with-temporal-cli)
to install Temporal CLI.

> NOTE: Required version is at least v1.1.0.

### Spin up environment

#### Start temporal server

> HTTP port is required for Nexus communications

```
temporal server start-dev --http-port 7243 --dynamic-config-value system.enableNexus=true
```

### Initialize environment

#### Create caller and target namespaces

```
temporal operator namespace create --namespace my-target-namespace
temporal operator namespace create --namespace my-caller-namespace
```

#### Create Nexus endpoint

```
temporal operator nexus endpoint create \
--name my-nexus-endpoint-name \
--target-namespace my-target-namespace \
--target-task-queue my-target-task-queue \
--description-file ./nexus/service/description.md
```

### Start Nexus handler worker
```
uv run python nexus/handler/worker.py
```

### Run Nexus caller application (worker + starter)
```
uv run python nexus/caller/app.py
```

### Output

TODO
3 changes: 3 additions & 0 deletions nexus/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from rich.traceback import install

install(show_locals=True)
58 changes: 58 additions & 0 deletions nexus/caller/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import asyncio
import sys
from typing import Any, Type

from temporalio.client import Client
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

from nexus.caller.workflows import (
Echo2CallerWorkflow,
Echo3CallerWorkflow,
EchoCallerWorkflow,
Hello2CallerWorkflow,
HelloCallerWorkflow,
)

interrupt_event = asyncio.Event()


async def execute_workflow(workflow_cls: Type[Any], input: Any) -> None:
client = await Client.connect("localhost:7233", namespace="my-caller-namespace")
task_queue = "my-caller-task-queue"

async with Worker(
client,
task_queue=task_queue,
workflows=[workflow_cls],
workflow_runner=UnsandboxedWorkflowRunner(),
):
print("🟠 Caller worker started")
result = await client.execute_workflow(
workflow_cls.run,
input,
id="my-caller-workflow-id",
task_queue=task_queue,
)
print("🟢 workflow result:", result)


if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python -m nexus.caller.app [echo|hello]")
sys.exit(1)

[wf_name] = sys.argv[1:]
coro = {
"echo": execute_workflow(EchoCallerWorkflow, "hello"),
"echo2": execute_workflow(Echo2CallerWorkflow, "hello"),
"echo3": execute_workflow(Echo3CallerWorkflow, "hello"),
"hello": execute_workflow(HelloCallerWorkflow, "world"),
"hello2": execute_workflow(Hello2CallerWorkflow, "world"),
}[wf_name]

loop = asyncio.new_event_loop()
try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
100 changes: 100 additions & 0 deletions nexus/caller/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from datetime import timedelta

import xray
from temporalio import workflow
from temporalio.workflow import NexusClient

from nexus.service.interface import (
EchoInput,
EchoOutput,
HelloInput,
HelloOutput,
MyNexusService,
)


class CallerWorkflowBase:
def __init__(self):
self.nexus_client = NexusClient(
MyNexusService, # or string name "my-nexus-service",
"my-nexus-endpoint-name",
schedule_to_close_timeout=timedelta(seconds=30),
)


@workflow.defn
class EchoCallerWorkflow(CallerWorkflowBase):
@xray.start_as_current_workflow_method_span()
@workflow.run
async def run(self, message: str) -> EchoOutput:
op_output = await self.nexus_client.execute_operation(
MyNexusService.echo,
EchoInput(message),
)
return op_output


@workflow.defn
class Echo2CallerWorkflow(CallerWorkflowBase):
@xray.start_as_current_workflow_method_span()
@workflow.run
async def run(self, message: str) -> EchoOutput:
op_output = await self.nexus_client.execute_operation(
MyNexusService.echo2,
EchoInput(message),
)
return op_output


@workflow.defn
class Echo3CallerWorkflow(CallerWorkflowBase):
@xray.start_as_current_workflow_method_span()
@workflow.run
async def run(self, message: str) -> EchoOutput:
op_output = await self.nexus_client.execute_operation(
MyNexusService.echo3,
EchoInput(message),
)
return op_output


@workflow.defn
class HelloCallerWorkflow(CallerWorkflowBase):
@xray.start_as_current_workflow_method_span()
@workflow.run
async def run(self, name: str) -> HelloOutput:
# TODO: Java returns a handle immediately. The handle has a blocking method to
# wait until the operation has started (i.e. initial Nexus RPC response is
# available, so operation ID is available in the case of an async operation).
handle = await self.nexus_client.start_operation(
MyNexusService.hello,
HelloInput(name),
)
op_output = await handle
return op_output


@workflow.defn
class Hello2CallerWorkflow(CallerWorkflowBase):
@xray.start_as_current_workflow_method_span()
@workflow.run
async def run(self, name: str) -> HelloOutput:
handle = await self.nexus_client.start_operation(
MyNexusService.hello2,
HelloInput(name),
)
op_output = await handle
return op_output


@workflow.defn
class Hello3CallerWorkflow(CallerWorkflowBase):
@xray.start_as_current_workflow_method_span()
@workflow.run
async def run(self, name: str) -> HelloOutput:
handle = await self.nexus_client.start_operation(
MyNexusService.hello3,
HelloInput(name),
)
op_output = await handle
return op_output
14 changes: 14 additions & 0 deletions nexus/clean
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
temporal-delete-all my-target-namespace
temporal-delete-all my-caller-namespace

temporal operator namespace create --namespace my-target-namespace
temporal operator namespace create --namespace my-caller-namespace

sleep 1

temporal operator nexus endpoint create \
--name my-nexus-endpoint-name \
--target-namespace my-target-namespace \
--target-task-queue my-target-task-queue \
--description-file ./nexus/service/description.md

38 changes: 38 additions & 0 deletions nexus/diagram.d2
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
shape: sequence_diagram

S: {
label: "Cloud Cell\n\n(handler namespace)"
}

NW: {
label: "Nexus\nWorker\n\n(User's codebase)"
}

NSDK: {
label: "Nexus\nSDK\n\n(nexus-rpc)"
}

NH: {
label: "Nexus\nHandler"
}

type-check: "type-check time" {
NW.00: "type-check operation impls\n\nE.g. Workflow.run signature\nmust match operation I/O types"
NW.01: "type-check call sites\n\nE.g.fetchResult"
}

import-time: "import time" {
NW.10: import MyService
NW -> NSDK: service_decorator(\ninterface, impl)
NSDK.1: validate impl\nagainst interface
}

start-worker: "start Worker" {
NW.12: "Worker(nexus_services=[MyService()])"
NW.14: "worker now has Service and \nOperation instances keyed by name\n\nthe Operation instances implement\nstart/cancel/getInfo/getResult interface"
}
handle-request: "handle request" {
S -> NW: dispatch NexusTask
NW -> NH: "lookup\nService and Operation\ninstance by name"
NH.1: "start()"
}
14 changes: 14 additions & 0 deletions nexus/handler/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import asyncio

from temporalio import activity

from nexus.service.interface import (
HelloInput,
HelloOutput,
)


@activity.defn
async def hello_activity(input: HelloInput) -> HelloOutput:
await asyncio.sleep(1)
return HelloOutput(message=f"hello {input.name}")
3 changes: 3 additions & 0 deletions nexus/handler/dbclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class MyDBClient:
def execute(self, query: str) -> str:
return "<query result>"
Loading
Loading