Skip to content

Commit 37d4372

Browse files
Sample code for Nexus messaging (#454)
* ˚ * Working on Nexus messaging samples * ˚ * Working on Nexus messaging samples * Nexus messaging sample is working now * update to the readme * Updates from a code review * Lint results * Fixes for the linter and increased a timeout for a flaky test * Lint fixes * Update nexus-messaging/src/callerpattern/caller/workflows.ts Co-authored-by: Chris Olszewski <chris.olszewski@temporal.io> * Update nexus-messaging/src/ondemandpattern/caller/workflows.ts Co-authored-by: Chris Olszewski <chris.olszewski@temporal.io> * Update nexus-messaging/src/ondemandpattern/caller/workflows.ts Co-authored-by: Chris Olszewski <chris.olszewski@temporal.io> --------- Co-authored-by: Chris Olszewski <chris.olszewski@temporal.io>
1 parent 280167f commit 37d4372

26 files changed

Lines changed: 1037 additions & 22 deletions

nexus-cancellation/README.md

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,24 +44,24 @@ All APIs are experimental and may be subject to backwards-incompatible changes.
4444
2. Make sure you have a local [Temporal Server](https://github.com/temporalio/cli/#installation) running:
4545

4646
```sh
47-
temporal server start-dev --port 7233
47+
temporal server start-dev
4848
```
4949

5050
3. Create the expected namespaces:
5151

52-
```bash
53-
temporal operator namespace create --namespace my-caller-namespace
54-
temporal operator namespace create --namespace my-target-namespace
55-
```
52+
```bash
53+
temporal operator namespace create --namespace my-caller-namespace
54+
temporal operator namespace create --namespace my-target-namespace
55+
```
5656

5757
4. Setup the Nexus Endpoint on the caller namespace:
5858

59-
```bash
60-
temporal operator nexus endpoint create \
61-
--name my-nexus-endpoint-name \
62-
--target-namespace my-target-namespace \
63-
--target-task-queue my-handler-task-queue
64-
```
59+
```bash
60+
temporal operator nexus endpoint create \
61+
--name my-nexus-endpoint-name \
62+
--target-namespace my-target-namespace \
63+
--target-task-queue my-handler-task-queue
64+
```
6565

6666
### Execution
6767

@@ -73,16 +73,16 @@ temporal operator nexus endpoint create \
7373

7474
Example output:
7575

76-
```bash
77-
Echo message: This message is from the client
76+
```bash
77+
Echo message: This message is from the client
7878

79-
--- Testing cancellable workflow (normal completion) ---
80-
Completed message: Hello, Temporal!
79+
--- Testing cancellable workflow (normal completion) ---
80+
Completed message: Hello, Temporal!
8181

82-
--- Testing cancellable workflow (with cancellation) ---
83-
Started cancellable workflow: workflow-cancelled-A1B2C3D4
84-
Workflow was cancelled as expected: NexusOperationFailure: ...
85-
```
82+
--- Testing cancellable workflow (with cancellation) ---
83+
Started cancellable workflow: workflow-cancelled-A1B2C3D4
84+
Workflow was cancelled as expected: NexusOperationFailure: ...
85+
```
8686

8787
## Key Concepts
8888

nexus-messaging/.eslintignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
node_modules
2+
lib
3+
.eslintrc.js

nexus-messaging/.eslintrc.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
const { builtinModules } = require('module');
2+
3+
const ALLOWED_NODE_BUILTINS = new Set(['assert']);
4+
5+
module.exports = {
6+
root: true,
7+
parser: '@typescript-eslint/parser',
8+
parserOptions: {
9+
project: './tsconfig.json',
10+
tsconfigRootDir: __dirname,
11+
},
12+
plugins: ['@typescript-eslint', 'deprecation'],
13+
extends: [
14+
'eslint:recommended',
15+
'plugin:@typescript-eslint/eslint-recommended',
16+
'plugin:@typescript-eslint/recommended',
17+
'prettier',
18+
],
19+
rules: {
20+
// recommended for safety
21+
'@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad
22+
'deprecation/deprecation': 'warn',
23+
24+
// code style preference
25+
'object-shorthand': ['error', 'always'],
26+
27+
// relaxed rules, for convenience
28+
'@typescript-eslint/no-unused-vars': [
29+
'warn',
30+
{
31+
argsIgnorePattern: '^_',
32+
varsIgnorePattern: '^_',
33+
},
34+
],
35+
'@typescript-eslint/no-explicit-any': 'off',
36+
},
37+
overrides: [
38+
{
39+
files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts', 'src/**/workflows.ts'],
40+
rules: {
41+
'no-restricted-imports': [
42+
'error',
43+
...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]),
44+
],
45+
},
46+
},
47+
],
48+
};

nexus-messaging/.prettierrc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
printWidth: 120
2+
singleQuote: true

nexus-messaging/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
This sample shows how to expose a long-running workflow's queries, updates, and signals as Nexus
2+
operations. There are two self-contained examples, each in its own directory under `src/`:
3+
4+
| | `callerpattern/` | `ondemandpattern/` |
5+
| ------------------------------ | ------------------------------------ | ------------------------------------------------------------ |
6+
| **Pattern** | Signal an existing workflow | Create and run workflows on demand, and send signals to them |
7+
| **Who creates the workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation |
8+
| **Who knows the workflow ID?** | Only the handler | The caller chooses and passes it in every operation |
9+
| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` |
10+
11+
Each directory is fully self-contained for clarity. The
12+
`GreetingWorkflow`, activities, and `Language` type are **identical** between the two -- only the
13+
Nexus service definition and its handler implementation differ. This highlights that the same workflow can be
14+
exposed through Nexus in different ways depending on whether the caller needs lifecycle control.
15+
16+
See each directory's README for running instructions.

nexus-messaging/package.json

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
{
2+
"name": "nexus-messaging",
3+
"version": "0.1.0",
4+
"private": true,
5+
"scripts": {
6+
"build": "tsc --build",
7+
"build.watch": "tsc --build --watch",
8+
"format": "prettier --write .",
9+
"format:check": "prettier --check .",
10+
"lint": "eslint .",
11+
"start.callerpattern.service": "ts-node src/callerpattern/service/worker.ts",
12+
"start.callerpattern.caller": "ts-node src/callerpattern/caller/worker.ts",
13+
"workflow.callerpattern": "ts-node src/callerpattern/starter.ts",
14+
"start.ondemandpattern.service": "ts-node src/ondemandpattern/service/worker.ts",
15+
"start.ondemandpattern.caller": "ts-node src/ondemandpattern/caller/worker.ts",
16+
"workflow.ondemandpattern": "ts-node src/ondemandpattern/starter.ts"
17+
},
18+
"nodemonConfig": {
19+
"execMap": {
20+
"ts": "ts-node"
21+
},
22+
"ext": "ts",
23+
"watch": [
24+
"src"
25+
]
26+
},
27+
"dependencies": {
28+
"@temporalio/activity": "^1.15.0",
29+
"@temporalio/client": "^1.15.0",
30+
"@temporalio/envconfig": "^1.15.0",
31+
"@temporalio/nexus": "^1.15.0",
32+
"@temporalio/worker": "^1.15.0",
33+
"@temporalio/workflow": "^1.15.0",
34+
"nexus-rpc": "^0.0.2",
35+
"nanoid": "3.x"
36+
},
37+
"devDependencies": {
38+
"@tsconfig/node22": "^22.0.0",
39+
"@types/node": "^22.9.1",
40+
"@typescript-eslint/eslint-plugin": "^8.18.0",
41+
"@typescript-eslint/parser": "^8.18.0",
42+
"eslint": "^8.57.1",
43+
"eslint-config-prettier": "^9.1.0",
44+
"eslint-plugin-deprecation": "^3.0.0",
45+
"nodemon": "^3.1.7",
46+
"prettier": "^3.4.2",
47+
"ts-node": "^10.9.2",
48+
"typescript": "^5.6.3"
49+
}
50+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
## Caller pattern
2+
3+
The handler worker starts a `GreetingWorkflow` for a user ID at boot.
4+
`NexusGreetingService` holds that ID and routes every Nexus operation to it.
5+
The caller's input does not have that workflow ID as the caller doesn't know it -- but the caller sends in the User ID,
6+
and `NexusGreetingService` knows how to get the desired workflow ID from that User ID (via the `GreetingWorkflow_for_<userId>` prefix).
7+
8+
The handler worker uses the same prefix to generate a workflow ID from a user ID when it launches the workflow.
9+
10+
The caller workflow:
11+
12+
1. Queries for supported languages (`getLanguages` -- backed by a query handler)
13+
2. Queries the current language (`getLanguage`)
14+
3. Changes the language to French (`setLanguage` -- backed by an update handler that calls an activity)
15+
4. Approves the workflow (`approve` -- backed by a signal handler)
16+
17+
### Running
18+
19+
Start a Temporal server:
20+
21+
```bash
22+
temporal server start-dev
23+
```
24+
25+
Create the namespaces and Nexus endpoint:
26+
27+
```bash
28+
temporal operator namespace create --namespace nexus-messaging-handler-namespace
29+
temporal operator namespace create --namespace nexus-messaging-caller-namespace
30+
31+
temporal operator nexus endpoint create \
32+
--name nexus-messaging-nexus-endpoint \
33+
--target-namespace nexus-messaging-handler-namespace \
34+
--target-task-queue nexus-messaging-handler-task-queue
35+
```
36+
37+
Install dependencies from the `nexus-messaging` directory:
38+
39+
```bash
40+
pnpm install
41+
```
42+
43+
In one terminal, start the handler worker:
44+
45+
```bash
46+
npm run start.callerpattern.service
47+
```
48+
49+
In a second terminal, start the caller worker:
50+
51+
```bash
52+
npm run start.callerpattern.caller
53+
```
54+
55+
In a third terminal, start the caller workflow:
56+
57+
```bash
58+
npm run workflow.callerpattern
59+
```
60+
61+
Expected output:
62+
63+
```
64+
languages: chinese, english
65+
current language: english
66+
set language to french, previous was: english
67+
approved
68+
```
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import * as nexus from 'nexus-rpc';
2+
3+
export const NEXUS_ENDPOINT = 'nexus-messaging-nexus-endpoint';
4+
export const HANDLER_TASK_QUEUE = 'nexus-messaging-handler-task-queue';
5+
export const HANDLER_NAMESPACE = 'nexus-messaging-handler-namespace';
6+
export const CALLER_NAMESPACE = 'nexus-messaging-caller-namespace';
7+
8+
export const nexusGreetingService = nexus.service('NexusGreetingService', {
9+
/**
10+
* Returns the list of all known languages for the given user's entity workflow.
11+
*/
12+
getLanguages: nexus.operation<GetLanguagesInput, GetLanguagesOutput>(),
13+
14+
/**
15+
* Returns the current greeting language for the given user's entity workflow.
16+
*/
17+
getLanguage: nexus.operation<GetLanguageInput, GetLanguageOutput>(),
18+
19+
/**
20+
* Sets the greeting language for the given user's entity workflow via an update.
21+
*/
22+
setLanguage: nexus.operation<SetLanguageInput, SetLanguageOutput>(),
23+
24+
/**
25+
* Approves (completes) the given user's entity workflow via a signal.
26+
*/
27+
approve: nexus.operation<ApproveInput, void>(),
28+
});
29+
30+
export type Language = 'arabic' | 'chinese' | 'english' | 'french' | 'hindi' | 'portuguese' | 'spanish';
31+
32+
export interface GetLanguagesInput {
33+
userId: string;
34+
}
35+
36+
export type GetLanguagesOutput = Language[];
37+
38+
export interface GetLanguageInput {
39+
userId: string;
40+
}
41+
42+
export type GetLanguageOutput = Language;
43+
44+
export interface SetLanguageInput {
45+
userId: string;
46+
language: Language;
47+
}
48+
49+
export type SetLanguageOutput = Language;
50+
51+
export interface ApproveInput {
52+
userId: string;
53+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { NativeConnection, Worker } from '@temporalio/worker';
2+
import { loadClientConnectConfig } from '@temporalio/envconfig';
3+
import { CALLER_NAMESPACE } from '../api';
4+
5+
const CALLER_TASK_QUEUE = 'nexus-messaging-caller-task-queue';
6+
7+
async function run() {
8+
const config = loadClientConnectConfig();
9+
const connection = await NativeConnection.connect(config.connectionOptions);
10+
try {
11+
const worker = await Worker.create({
12+
connection,
13+
namespace: CALLER_NAMESPACE,
14+
taskQueue: CALLER_TASK_QUEUE,
15+
workflowsPath: require.resolve('./workflows'),
16+
});
17+
18+
await worker.run();
19+
} finally {
20+
await connection.close();
21+
}
22+
}
23+
24+
run().catch((err) => {
25+
console.error(err);
26+
process.exit(1);
27+
});
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import * as wf from '@temporalio/workflow';
2+
import { nexusGreetingService, NEXUS_ENDPOINT, Language } from '../api';
3+
4+
const nexusClient = wf.createNexusServiceClient({
5+
service: nexusGreetingService,
6+
endpoint: NEXUS_ENDPOINT,
7+
});
8+
9+
export async function callerWorkflow(userId: string): Promise<string[]> {
10+
const log: string[] = [];
11+
12+
// Query the list of known languages
13+
const languages = await nexusClient.executeOperation('getLanguages', { userId }, { scheduleToCloseTimeout: '10s' });
14+
log.push(`languages: ${languages.join(', ')}`);
15+
16+
// Query the current language
17+
const currentLanguage = await nexusClient.executeOperation(
18+
'getLanguage',
19+
{ userId },
20+
{ scheduleToCloseTimeout: '10s' },
21+
);
22+
log.push(`current language: ${currentLanguage}`);
23+
24+
// Set language to French via update
25+
const previousLanguage: Language = await nexusClient.executeOperation(
26+
'setLanguage',
27+
{ userId, language: 'french' },
28+
{ scheduleToCloseTimeout: '10s' },
29+
);
30+
log.push(`set language to french, previous was: ${previousLanguage}`);
31+
32+
// Approve (signal) the entity workflow
33+
await nexusClient.executeOperation('approve', { userId }, { scheduleToCloseTimeout: '10s' });
34+
log.push('approved');
35+
36+
return log;
37+
}

0 commit comments

Comments
 (0)