Skip to content

Commit 95b76f9

Browse files
authored
Message handlers sample (#372)
1 parent 692fd26 commit 95b76f9

21 files changed

+558
-0
lines changed

Diff for: .husky/pre-commit renamed to .husky/pre-push

File renamed without changes.

Diff for: .scripts/list-of-samples.json

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"timer-examples",
3939
"timer-progress",
4040
"update",
41+
"updates_and_signals",
4142
"vscode-debugger",
4243
"worker-specific-task-queues",
4344
"worker-versioning"
+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
node_modules
2+
lib
3+
.eslintrc.js
+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
const { builtinModules } = require('module');
2+
3+
const ALLOWED_NODE_BUILTINS = new Set(['assert']);
4+
5+
module.exports = {
6+
root: true,
7+
parser: '@typescript-eslint/parser',
8+
parserOptions: {
9+
project: './tsconfig.json',
10+
tsconfigRootDir: __dirname,
11+
},
12+
plugins: ['@typescript-eslint', 'deprecation'],
13+
extends: [
14+
'eslint:recommended',
15+
'plugin:@typescript-eslint/eslint-recommended',
16+
'plugin:@typescript-eslint/recommended',
17+
'prettier',
18+
],
19+
rules: {
20+
// recommended for safety
21+
'@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad
22+
'deprecation/deprecation': 'warn',
23+
24+
// code style preference
25+
'object-shorthand': ['error', 'always'],
26+
27+
// relaxed rules, for convenience
28+
'@typescript-eslint/no-unused-vars': [
29+
'warn',
30+
{
31+
argsIgnorePattern: '^_',
32+
varsIgnorePattern: '^_',
33+
},
34+
],
35+
'@typescript-eslint/no-explicit-any': 'off',
36+
},
37+
overrides: [
38+
{
39+
files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'],
40+
rules: {
41+
'no-restricted-imports': [
42+
'error',
43+
...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]),
44+
],
45+
},
46+
},
47+
],
48+
};

Diff for: updates_and_signals/safe_message_handlers/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
lib
2+
node_modules

Diff for: updates_and_signals/safe_message_handlers/.npmrc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package-lock=false

Diff for: updates_and_signals/safe_message_handlers/.nvmrc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
16
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
To begin development, install the Temporal CLI:
2+
3+
Mac: {cyan brew install temporal}
4+
Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest
5+
6+
Start Temporal Server:
7+
8+
{cyan temporal server start-dev}
9+
10+
Use Node version 16+:
11+
12+
Mac: {cyan brew install node@16}
13+
Other: https://nodejs.org/en/download/
14+
15+
Then, in the project directory, using two other shells, run these commands:
16+
17+
{cyan npm run start.watch}
18+
{cyan npm run workflow}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
lib
+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
printWidth: 120
2+
singleQuote: true

Diff for: updates_and_signals/safe_message_handlers/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
### Running this sample
2+
3+
1. `npm install` to install dependencies.
4+
1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation).
5+
1. `npm run start.watch` to start the Worker.
6+
1. In another shell, `npm run simulation` to run a simulation of the cluster manager.
+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
{
2+
"name": "temporal-update",
3+
"version": "0.1.0",
4+
"private": true,
5+
"scripts": {
6+
"build": "tsc --build",
7+
"build.watch": "tsc --build --watch",
8+
"lint": "eslint .",
9+
"start": "ts-node src/worker.ts",
10+
"start.watch": "nodemon src/worker.ts",
11+
"simulation": "ts-node src/run-simulation.ts",
12+
"format": "prettier --config .prettierrc 'src/**/*.ts' --write",
13+
"test": "mocha --exit --require ts-node/register src/test/*.test.ts"
14+
},
15+
"nodemonConfig": {
16+
"execMap": {
17+
"ts": "ts-node"
18+
},
19+
"ext": "ts",
20+
"watch": [
21+
"src"
22+
]
23+
},
24+
"dependencies": {
25+
"@temporalio/activity": "^1.11.1",
26+
"@temporalio/client": "^1.11.1",
27+
"@temporalio/worker": "^1.11.1",
28+
"@temporalio/workflow": "^1.11.1",
29+
"async-mutex": "^0.5.0",
30+
"nanoid": "3.x"
31+
},
32+
"devDependencies": {
33+
"@temporalio/testing": "^1.11.1",
34+
"@tsconfig/node16": "^1.0.0",
35+
"@types/mocha": "8.x",
36+
"@types/node": "^16.11.43",
37+
"@typescript-eslint/eslint-plugin": "^5.0.0",
38+
"@typescript-eslint/parser": "^5.0.0",
39+
"eslint": "^7.32.0",
40+
"eslint-config-prettier": "^8.3.0",
41+
"eslint-plugin-deprecation": "^1.2.1",
42+
"mocha": "8.x",
43+
"nodemon": "^2.0.12",
44+
"prettier": "^2.8.8",
45+
"ts-node": "^10.8.1",
46+
"typescript": "^4.4.2"
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import * as activities from '@temporalio/activity';
2+
3+
interface AssignNodesToJobInput {
4+
nodes: string[];
5+
jobName: string;
6+
}
7+
8+
interface UnassignNodesForJobInput {
9+
nodes: string[];
10+
jobName: string;
11+
}
12+
13+
export async function startCluster(): Promise<void> {
14+
activities.log.info('Starting cluster');
15+
await activities.sleep(100); // Simulate RPC
16+
}
17+
18+
export async function assignNodesToJob(input: AssignNodesToJobInput): Promise<void> {
19+
activities.log.info(`Assigning nodes ${input.nodes} to job ${input.jobName}`);
20+
await activities.sleep(100); // Simulate RPC
21+
}
22+
23+
export async function unassignNodesForJob(input: UnassignNodesForJobInput): Promise<void> {
24+
activities.log.info(`Unassigning nodes ${input.nodes} from job ${input.jobName}`);
25+
await activities.sleep(100); // Simulate RPC
26+
}
+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { Connection, Client, WorkflowHandle } from '@temporalio/client';
2+
import { nanoid } from 'nanoid';
3+
4+
import { clusterManagerWorkflow } from './workflows';
5+
6+
export async function startClusterManager(): Promise<WorkflowHandle<typeof clusterManagerWorkflow>> {
7+
const connection = await Connection.connect({ address: 'localhost:7233' });
8+
const client = new Client({ connection });
9+
return client.workflow.start(clusterManagerWorkflow, {
10+
taskQueue: 'safe-message-handlers-task-queue',
11+
workflowId: `cm-${nanoid()}`,
12+
});
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import * as wf from '@temporalio/workflow';
2+
import type * as activities from './activities';
3+
import { Mutex } from 'async-mutex';
4+
import {
5+
AssignNodesToJobUpdateInput,
6+
ClusterManagerState,
7+
ClusterManagerStateSummary,
8+
DeleteJobUpdateInput,
9+
} from './types';
10+
11+
const { assignNodesToJob, unassignNodesForJob, startCluster } = wf.proxyActivities<typeof activities>({
12+
startToCloseTimeout: '1 minute',
13+
});
14+
15+
// ClusterManagerWorkflow keeps track of the job assignments of a cluster of nodes. It exposes an
16+
// API to started and shutdown the cluster, to assign jobs to nodes, to delete jobs, and to query
17+
// cluster status. The workflow maps this API to Signals, Updates, and Queries. The assign and
18+
// delete operations issue an RPC changing the state of the remote cluster, and then mutate workflow
19+
// state reflecting the change made. In order that workflow state remains in sync with the true
20+
// cluster state, assign/delete operations must not be performed concurrently (i.e. they must not
21+
// "interleave" with each other; they must be "serialized"; they must be "atomic"). An async mutex
22+
// from a 3rd party library is used to ensure this.
23+
export class ClusterManager {
24+
state: ClusterManagerState;
25+
seenJobs: Set<string>;
26+
nodesMutex: Mutex;
27+
28+
constructor(state?: ClusterManagerState) {
29+
this.state = state ?? {
30+
clusterStarted: false,
31+
clusterShutdown: false,
32+
nodes: new Map<string, string | null>(),
33+
maxAssignedNodes: 0,
34+
};
35+
this.nodesMutex = new Mutex();
36+
this.seenJobs = new Set<string>();
37+
}
38+
39+
async startCluster(): Promise<void> {
40+
await startCluster();
41+
this.state.clusterStarted = true;
42+
for (let i = 0; i < 25; i++) {
43+
this.state.nodes.set(i.toString(), null);
44+
}
45+
wf.log.info('Cluster started');
46+
}
47+
48+
async shutDownCluster(): Promise<void> {
49+
await wf.condition(() => this.state.clusterStarted);
50+
this.state.clusterShutdown = true;
51+
wf.log.info('Cluster shutdown');
52+
}
53+
54+
async assignNodesToJob(input: AssignNodesToJobUpdateInput): Promise<ClusterManagerStateSummary> {
55+
await wf.condition(() => this.state.clusterStarted);
56+
if (this.state.clusterShutdown) {
57+
// If you want the client to receive a failure, either add an update validator and throw the
58+
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
59+
// cause the workflow to keep retrying and get it stuck.
60+
throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is already shut down');
61+
}
62+
return await this.nodesMutex.runExclusive(async (): Promise<ClusterManagerStateSummary> => {
63+
// Idempotency guard: do nothing if the job already has nodes assigned.
64+
if (!this.seenJobs.has(input.jobName)) {
65+
const unassignedNodes = this.getUnassignedNodes();
66+
if (input.numNodes > unassignedNodes.size) {
67+
throw new wf.ApplicationFailure(
68+
`Cannot assign ${input.numNodes} nodes; have only ${unassignedNodes.size} available`
69+
);
70+
}
71+
const nodesToAssign = Array.from(unassignedNodes).slice(0, input.numNodes);
72+
// This await would be dangerous without the lock held because it would allow interleaving
73+
// with the deleteJob operation, which mutates self.state.nodes.
74+
await assignNodesToJob({ nodes: nodesToAssign, jobName: input.jobName });
75+
for (const node of nodesToAssign) {
76+
this.state.nodes.set(node, input.jobName);
77+
}
78+
this.seenJobs.add(input.jobName);
79+
this.state.maxAssignedNodes = Math.max(this.state.maxAssignedNodes, this.getAssignedNodes().size);
80+
}
81+
return this.getStateSummary();
82+
});
83+
}
84+
85+
async deleteJob(input: DeleteJobUpdateInput) {
86+
await wf.condition(() => this.state.clusterStarted);
87+
if (this.state.clusterShutdown) {
88+
// If you want the client to receive a failure, either add an update validator and throw the
89+
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
90+
// cause the workflow to keep retrying and get it stuck.
91+
throw new wf.ApplicationFailure('Cannot delete job: Cluster is already shut down');
92+
}
93+
await this.nodesMutex.runExclusive(async () => {
94+
const nodesToUnassign = Array.from(this.state.nodes.entries())
95+
.filter(([_, v]) => v === input.jobName)
96+
.map(([k, _]) => k);
97+
// This await would be dangerous without the lock held because it would allow interleaving
98+
// with the assignNodesToJob operation, which mutates self.state.nodes.
99+
await unassignNodesForJob({ nodes: nodesToUnassign, jobName: input.jobName });
100+
for (const node of nodesToUnassign) {
101+
this.state.nodes.set(node, null);
102+
}
103+
});
104+
}
105+
106+
getState(): ClusterManagerState {
107+
return {
108+
clusterStarted: this.state.clusterStarted,
109+
clusterShutdown: this.state.clusterShutdown,
110+
nodes: this.state.nodes,
111+
maxAssignedNodes: this.state.maxAssignedNodes,
112+
};
113+
}
114+
115+
getStateSummary(): ClusterManagerStateSummary {
116+
return {
117+
maxAssignedNodes: this.state.maxAssignedNodes,
118+
assignedNodes: this.getAssignedNodes().size,
119+
};
120+
}
121+
122+
getUnassignedNodes(): Set<string> {
123+
return new Set(Array.from(this.state.nodes.keys()).filter((key) => this.state.nodes.get(key) === null));
124+
}
125+
126+
getAssignedNodes(jobName?: string): Set<string> {
127+
return new Set(
128+
Array.from(this.state.nodes.keys()).filter((key) => {
129+
const value = this.state.nodes.get(key);
130+
if (jobName === undefined) {
131+
return value !== null && value !== 'BAD!';
132+
}
133+
return value === jobName;
134+
})
135+
);
136+
}
137+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { WorkflowHandle } from '@temporalio/client';
2+
3+
import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterSignal } from './workflows';
4+
import { startClusterManager } from './client';
5+
import { setTimeout } from 'timers/promises';
6+
7+
async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise<void> {
8+
await wf.signal(startClusterSignal);
9+
10+
const allocationUpdates: Promise<any>[] = [];
11+
for (let i = 0; i < 6; i++) {
12+
allocationUpdates.push(wf.executeUpdate(assignNodesToJobUpdate, { args: [{ numNodes: 2, jobName: `task-${i}` }] }));
13+
}
14+
await Promise.all(allocationUpdates);
15+
16+
if (delaySeconds) {
17+
await setTimeout(delaySeconds * 1000);
18+
}
19+
20+
const deletionUpdates: Promise<any>[] = [];
21+
for (let i = 0; i < 6; i++) {
22+
deletionUpdates.push(wf.executeUpdate(deleteJobUpdate, { args: [{ jobName: `task-${i}` }] }));
23+
}
24+
await Promise.all(deletionUpdates);
25+
26+
await wf.signal(shutdownClusterSignal);
27+
}
28+
29+
async function main() {
30+
const workflow = await startClusterManager();
31+
await runSimulation(workflow);
32+
}
33+
34+
main().catch((err) => {
35+
console.error(err);
36+
process.exit(1);
37+
});

0 commit comments

Comments
 (0)