Skip to content

Commit 15307fe

Browse files
committed
Implement the sample for real
1 parent c53d12b commit 15307fe

File tree

11 files changed

+380
-195
lines changed

11 files changed

+380
-195
lines changed

Diff for: updates_and_signals/safe_message_handlers/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
1. `npm install` to install dependencies.
44
1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation).
55
1. `npm run start.watch` to start the Worker.
6-
1. In another shell, `npm run workflow` to run the Workflow Client.
6+
1. In another shell, `npm run simulation` to run a simulation of the cluster manager.

Diff for: updates_and_signals/safe_message_handlers/package.json

+6-3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
"lint": "eslint .",
99
"start": "ts-node src/worker.ts",
1010
"start.watch": "nodemon src/worker.ts",
11-
"workflow": "ts-node src/client.ts",
11+
"simulation": "ts-node src/run-simulation.ts",
1212
"format": "prettier --config .prettierrc 'src/**/*.ts' --write",
13-
"test": "mocha --exit --require ts-node/register --require source-map-support/register src/mocha/*.test.ts"
13+
"test": "ts-node src/test.ts"
1414
},
1515
"nodemonConfig": {
1616
"execMap": {
@@ -26,13 +26,16 @@
2626
"@temporalio/client": "^1.9.0",
2727
"@temporalio/worker": "^1.9.0",
2828
"@temporalio/workflow": "^1.9.0",
29-
"nanoid": "3.x"
29+
"async-mutex": "^0.5.0",
30+
"nanoid": "3.x",
31+
"uuid": "^10.0.0"
3032
},
3133
"devDependencies": {
3234
"@temporalio/testing": "^1.9.0",
3335
"@tsconfig/node16": "^1.0.0",
3436
"@types/mocha": "8.x",
3537
"@types/node": "^16.11.43",
38+
"@types/uuid": "^10.0.0",
3639
"@typescript-eslint/eslint-plugin": "^5.0.0",
3740
"@typescript-eslint/parser": "^5.0.0",
3841
"eslint": "^7.32.0",
+14-9
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,30 @@
1-
export interface AllocateNodesToJobInput {
1+
interface AssignNodesToJobInput {
22
nodes: string[];
33
jobName: string;
44
}
55

6-
export interface DeallocateNodesForJobInput {
6+
interface UnassignNodesForJobInput {
77
nodes: string[];
88
jobName: string;
99
}
1010

11-
export interface FindBadNodesInput {
11+
interface FindBadNodesInput {
1212
nodesToCheck: string[];
1313
}
1414

15-
export async function allocateNodesToJob(input: AllocateNodesToJobInput): Promise<void> {
15+
export async function assignNodesToJob(input: AssignNodesToJobInput): Promise<void> {
1616
console.log(`Assigning nodes ${input.nodes} to job ${input.jobName}`);
17-
await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async operation
17+
await sleep(100); // Simulate RPC
1818
}
1919

20-
export async function deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise<void> {
21-
console.log(`Deallocating nodes ${input.nodes} from job ${input.jobName}`);
22-
await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async operation
20+
export async function unassignNodesForJob(input: UnassignNodesForJobInput): Promise<void> {
21+
console.log(`Unassigning nodes ${input.nodes} from job ${input.jobName}`);
22+
await sleep(100); // Simulate RPC
2323
}
2424

2525
export async function findBadNodes(input: FindBadNodesInput): Promise<string[]> {
26-
await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async operation
26+
console.log('Finding bad nodes');
27+
await sleep(100); // Simulate RPC
2728
const badNodes = input.nodesToCheck.filter((n) => parseInt(n) % 5 === 0);
2829
if (badNodes.length) {
2930
console.log(`Found bad nodes: ${badNodes}`);
@@ -32,3 +33,7 @@ export async function findBadNodes(input: FindBadNodesInput): Promise<string[]>
3233
}
3334
return badNodes;
3435
}
36+
37+
async function sleep(ms: number): Promise<void> {
38+
await new Promise((resolve) => setTimeout(resolve, ms));
39+
}
+7-38
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,14 @@
11
import { Connection, Client, WorkflowHandle } from '@temporalio/client';
2-
import * as workflow from './workflows';
2+
import { v4 as uuid } from 'uuid';
33

4-
async function doClusterLifecycle(wf: WorkflowHandle, delaySeconds?: number): Promise<void> {
5-
await wf.signal(workflow.startClusterSignal);
4+
import { clusterManagerWorkflow } from './workflows';
65

7-
const allocationUpdates: Promise<any>[] = [];
8-
for (let i = 0; i < 6; i++) {
9-
allocationUpdates.push(
10-
wf.executeUpdate(workflow.allocateNodesToJobUpdate, { args: [{ numNodes: 2, jobName: `task-${i}` }] })
11-
);
12-
}
13-
await Promise.all(allocationUpdates);
14-
15-
if (delaySeconds) {
16-
await new Promise((resolve) => setTimeout(resolve, delaySeconds * 1000));
17-
}
18-
19-
const deletionUpdates: Promise<any>[] = [];
20-
for (let i = 0; i < 6; i++) {
21-
deletionUpdates.push(wf.executeUpdate(workflow.deleteJobUpdate, { args: [{ jobName: `task-${i}` }] }));
22-
}
23-
await Promise.all(deletionUpdates);
24-
25-
await wf.signal(workflow.shutdownClusterSignal);
26-
}
27-
async function main() {
6+
export async function startClusterManager(): Promise<WorkflowHandle<typeof clusterManagerWorkflow>> {
287
const connection = await Connection.connect({ address: 'localhost:7233' });
298
const client = new Client({ connection });
30-
31-
// Define the workflow handle
32-
const wfHandle = await client.workflow.start(workflow.clusterManagerWorkflow, {
33-
args: [{ testContinueAsNew: true }],
34-
taskQueue: 'tq',
35-
workflowId: 'cluster-management-workflow',
9+
return client.workflow.start(clusterManagerWorkflow, {
10+
args: [{}],
11+
taskQueue: 'safe-message-handlers-task-queue',
12+
workflowId: `cluster-manager-${uuid()}`,
3613
});
37-
38-
// Start the cluster lifecycle
39-
await doClusterLifecycle(wfHandle);
4014
}
41-
42-
main().catch((err) => {
43-
console.error(err);
44-
process.exit(1);
45-
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import * as wf from '@temporalio/workflow';
2+
import type * as activities from './activities';
3+
import * as _3rdPartyAsyncMutexLibrary from 'async-mutex';
4+
import {
5+
AssignNodesToJobUpdateInput,
6+
ClusterManagerState,
7+
ClusterManagerStateSummary,
8+
DeleteJobUpdateInput,
9+
} from './types';
10+
11+
const { assignNodesToJob, unassignNodesForJob } = wf.proxyActivities<typeof activities>({
12+
startToCloseTimeout: '1 minute',
13+
});
14+
15+
const { findBadNodes } = wf.proxyActivities<typeof activities>({
16+
startToCloseTimeout: '1 minute',
17+
retry: {
18+
// This activity is called with the nodexMutex held. We do not retry, since retries would block
19+
// cluster operations.
20+
maximumAttempts: 1,
21+
},
22+
});
23+
24+
// ClusterManagerWorkflow keeps track of the job assignments of a cluster of nodes. It exposes an
25+
// API to started and shutdown the cluster, to assign jobs to nodes, and to delete jobs. The
26+
// workflow maps this API to signals and updates. Operations altering node assignments must not
27+
// interleave (must be serialized), and a standard (non-Temporal-specific) async mutex from a 3rd
28+
// party library is used to ensure this.
29+
export class ClusterManager {
30+
state: ClusterManagerState;
31+
jobsWithNodesAssigned: Set<string>;
32+
nodesMutex: _3rdPartyAsyncMutexLibrary.Mutex;
33+
34+
constructor(state?: ClusterManagerState) {
35+
this.state = state ?? {
36+
clusterStarted: false,
37+
clusterShutdown: false,
38+
nodes: new Map<string, string | null>(),
39+
maxAssignedNodes: 0,
40+
};
41+
this.jobsWithNodesAssigned = new Set<string>();
42+
this.nodesMutex = new _3rdPartyAsyncMutexLibrary.Mutex();
43+
}
44+
45+
startCluster(): void {
46+
this.state.clusterStarted = true;
47+
for (let i = 0; i < 25; i++) {
48+
this.state.nodes.set(i.toString(), null);
49+
}
50+
wf.log.info('Cluster started');
51+
}
52+
53+
async shutDownCluster(): Promise<void> {
54+
await wf.condition(() => this.state.clusterStarted);
55+
this.state.clusterShutdown = true;
56+
wf.log.info('Cluster shutdown');
57+
}
58+
59+
async assignNodesToJob(input: AssignNodesToJobUpdateInput): Promise<ClusterManagerStateSummary> {
60+
await wf.condition(() => this.state.clusterStarted);
61+
if (this.state.clusterShutdown) {
62+
// If you want the client to receive a failure, either add an update validator and throw the
63+
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
64+
// cause the workflow to keep retrying and get it stuck.
65+
throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is already shut down');
66+
}
67+
return await this.nodesMutex.runExclusive(async (): Promise<ClusterManagerStateSummary> => {
68+
// Idempotency guard: do nothing if the job already has nodes assigned.
69+
if (!new Set(this.state.nodes.values()).has(input.jobName)) {
70+
const unassignedNodes = this.getUnassignedNodes();
71+
if (input.numNodes > unassignedNodes.size) {
72+
throw new wf.ApplicationFailure(
73+
`Cannot assign ${input.numNodes} nodes; have only ${unassignedNodes.size} available`
74+
);
75+
}
76+
const nodesToAssign = Array.from(unassignedNodes).slice(0, input.numNodes);
77+
// This await would be dangerous without the lock held because it would allow interleaving
78+
// with the deleteJob and performHealthCheck operations, both of which mutate
79+
// self.state.nodes.
80+
await assignNodesToJob({ nodes: nodesToAssign, jobName: input.jobName });
81+
for (const node of nodesToAssign) {
82+
this.state.nodes.set(node, input.jobName);
83+
}
84+
this.state.maxAssignedNodes = Math.max(this.state.maxAssignedNodes, this.getAssignedNodes().size);
85+
}
86+
return this.getStateSummary();
87+
});
88+
}
89+
90+
async deleteJob(input: DeleteJobUpdateInput) {
91+
await wf.condition(() => this.state.clusterStarted);
92+
if (this.state.clusterShutdown) {
93+
// If you want the client to receive a failure, either add an update validator and throw the
94+
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
95+
// cause the workflow to keep retrying and get it stuck.
96+
throw new wf.ApplicationFailure('Cannot delete job: Cluster is already shut down');
97+
}
98+
await this.nodesMutex.runExclusive(async () => {
99+
const nodesToUnassign = Array.from(this.state.nodes.entries())
100+
.filter(([_, v]) => v === input.jobName)
101+
.map(([k, _]) => k);
102+
// This await would be dangerous without the lock held because it would allow interleaving
103+
// with the assignNodesToJob and performHealthCheck operations, both of which mutate
104+
// self.state.nodes.
105+
await unassignNodesForJob({ nodes: nodesToUnassign, jobName: input.jobName });
106+
for (const node of nodesToUnassign) {
107+
this.state.nodes.set(node, null);
108+
}
109+
});
110+
}
111+
112+
async performHealthChecks(): Promise<void> {
113+
wf.log.info('performHealthChecks');
114+
await this.nodesMutex.runExclusive(async () => {
115+
const badNodes = await findBadNodes({ nodesToCheck: Array.from(this.getAssignedNodes()) });
116+
for (const node of badNodes) {
117+
this.state.nodes.set(node, 'BAD!');
118+
}
119+
});
120+
}
121+
122+
getState(): ClusterManagerState {
123+
return {
124+
clusterStarted: this.state.clusterStarted,
125+
clusterShutdown: this.state.clusterShutdown,
126+
nodes: this.state.nodes,
127+
maxAssignedNodes: this.state.maxAssignedNodes,
128+
};
129+
}
130+
131+
getStateSummary(): ClusterManagerStateSummary {
132+
return {
133+
maxAssignedNodes: this.state.maxAssignedNodes,
134+
assignedNodes: this.getAssignedNodes().size,
135+
badNodes: this.getBadNodes().size,
136+
};
137+
}
138+
139+
getUnassignedNodes(): Set<string> {
140+
return new Set(Array.from(this.state.nodes.keys()).filter((key) => this.state.nodes.get(key) === null));
141+
}
142+
143+
getBadNodes(): Set<string> {
144+
return new Set(Array.from(this.state.nodes.keys()).filter((key) => this.state.nodes.get(key) === 'BAD!'));
145+
}
146+
147+
getAssignedNodes(jobName?: string): Set<string> {
148+
return new Set(
149+
Array.from(this.state.nodes.keys()).filter((key) => {
150+
const value = this.state.nodes.get(key);
151+
return jobName ? value === jobName : value !== null && value !== 'BAD!';
152+
})
153+
);
154+
}
155+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { WorkflowHandle } from '@temporalio/client';
2+
3+
import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterSignal } from './workflows';
4+
import { startClusterManager } from './client';
5+
6+
async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise<void> {
7+
await wf.signal(startClusterSignal);
8+
9+
const allocationUpdates: Promise<any>[] = [];
10+
for (let i = 0; i < 6; i++) {
11+
allocationUpdates.push(wf.executeUpdate(assignNodesToJobUpdate, { args: [{ numNodes: 2, jobName: `task-${i}` }] }));
12+
}
13+
await Promise.all(allocationUpdates);
14+
15+
if (delaySeconds) {
16+
await new Promise((resolve) => setTimeout(resolve, delaySeconds * 1000));
17+
}
18+
19+
const deletionUpdates: Promise<any>[] = [];
20+
for (let i = 0; i < 6; i++) {
21+
deletionUpdates.push(wf.executeUpdate(deleteJobUpdate, { args: [{ jobName: `task-${i}` }] }));
22+
}
23+
await Promise.all(deletionUpdates);
24+
25+
await wf.signal(shutdownClusterSignal);
26+
}
27+
28+
async function main() {
29+
const workflow = await startClusterManager();
30+
await runSimulation(workflow);
31+
}
32+
33+
main().catch((err) => {
34+
console.error(err);
35+
process.exit(1);
36+
});
+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import {
2+
assignNodesToJobUpdate,
3+
startClusterSignal,
4+
shutdownClusterSignal,
5+
deleteJobUpdate,
6+
getClusterStatusQuery,
7+
} from './workflows';
8+
import { startClusterManager } from './client';
9+
import assert from 'assert';
10+
import { ClusterManagerStateSummary } from './types';
11+
12+
async function testClusterManager() {
13+
const workflow = await startClusterManager();
14+
await workflow.signal(startClusterSignal);
15+
const request1 = {
16+
numNodes: 5,
17+
jobName: 'job1',
18+
};
19+
20+
// Use an update to assign nodes.
21+
const updateResult1 = await workflow.executeUpdate(assignNodesToJobUpdate, {
22+
args: [request1],
23+
});
24+
assert.equal(updateResult1.assignedNodes, request1.numNodes);
25+
assert.equal(updateResult1.maxAssignedNodes, request1.numNodes);
26+
27+
// Assign nodes to a job and then delete it
28+
const request2 = {
29+
numNodes: 6,
30+
jobName: 'job2',
31+
};
32+
const updateResult2 = await workflow.executeUpdate(assignNodesToJobUpdate, {
33+
args: [request2],
34+
});
35+
assert.equal(updateResult2.assignedNodes, request1.numNodes + request2.numNodes);
36+
assert.equal(updateResult2.maxAssignedNodes, request1.numNodes + request2.numNodes);
37+
38+
await workflow.executeUpdate(deleteJobUpdate, { args: [{ jobName: 'job2' }] });
39+
40+
// The delete doesn't return anything; use the query to get current cluster state
41+
const queryResult = await workflow.query(getClusterStatusQuery);
42+
assert.equal(
43+
queryResult.assignedNodes,
44+
request1.numNodes,
45+
`expected ${request1.numNodes} left after deleting ${request2.numNodes}`
46+
);
47+
assert.equal(queryResult.maxAssignedNodes, request1.numNodes + request2.numNodes);
48+
49+
// Terminate the workflow and check that workflow returns same value as obtained from last query.
50+
await workflow.signal(shutdownClusterSignal);
51+
const wfResult = await workflow.result();
52+
assert.deepEqual(wfResult, queryResult);
53+
}
54+
55+
async function runTests() {
56+
for (const fn of [testClusterManager]) {
57+
console.log(fn.name);
58+
await fn();
59+
}
60+
}
61+
62+
runTests().catch((err) => {
63+
console.error(err);
64+
process.exit(1);
65+
});

0 commit comments

Comments
 (0)