Skip to content

Message handlers sample #372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
1 change: 1 addition & 0 deletions .scripts/list-of-samples.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"timer-examples",
"timer-progress",
"update",
"updates_and_signals",
"vscode-debugger",
"worker-specific-task-queues",
"worker-versioning"
Expand Down
3 changes: 3 additions & 0 deletions updates_and_signals/safe_message_handlers/.eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
lib
.eslintrc.js
48 changes: 48 additions & 0 deletions updates_and_signals/safe_message_handlers/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const { builtinModules } = require('module');

const ALLOWED_NODE_BUILTINS = new Set(['assert']);

module.exports = {
root: true,
parser: '@typescript-eslint/parser',
parserOptions: {
project: './tsconfig.json',
tsconfigRootDir: __dirname,
},
plugins: ['@typescript-eslint', 'deprecation'],
extends: [
'eslint:recommended',
'plugin:@typescript-eslint/eslint-recommended',
'plugin:@typescript-eslint/recommended',
'prettier',
],
rules: {
// recommended for safety
'@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad
'deprecation/deprecation': 'warn',

// code style preference
'object-shorthand': ['error', 'always'],

// relaxed rules, for convenience
'@typescript-eslint/no-unused-vars': [
'warn',
{
argsIgnorePattern: '^_',
varsIgnorePattern: '^_',
},
],
'@typescript-eslint/no-explicit-any': 'off',
},
overrides: [
{
files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'],
rules: {
'no-restricted-imports': [
'error',
...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]),
],
},
},
],
};
2 changes: 2 additions & 0 deletions updates_and_signals/safe_message_handlers/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
lib
node_modules
1 change: 1 addition & 0 deletions updates_and_signals/safe_message_handlers/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package-lock=false
1 change: 1 addition & 0 deletions updates_and_signals/safe_message_handlers/.nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
16
18 changes: 18 additions & 0 deletions updates_and_signals/safe_message_handlers/.post-create
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
To begin development, install the Temporal CLI:

Mac: {cyan brew install temporal}
Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest

Start Temporal Server:

{cyan temporal server start-dev}

Use Node version 16+:

Mac: {cyan brew install node@16}
Other: https://nodejs.org/en/download/

Then, in the project directory, using two other shells, run these commands:

{cyan npm run start.watch}
{cyan npm run workflow}
1 change: 1 addition & 0 deletions updates_and_signals/safe_message_handlers/.prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lib
2 changes: 2 additions & 0 deletions updates_and_signals/safe_message_handlers/.prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
printWidth: 120
singleQuote: true
6 changes: 6 additions & 0 deletions updates_and_signals/safe_message_handlers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Running this sample

1. `npm install` to install dependencies.
1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation).
1. `npm run start.watch` to start the Worker.
1. In another shell, `npm run simulation` to run a simulation of the cluster manager.
48 changes: 48 additions & 0 deletions updates_and_signals/safe_message_handlers/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"name": "temporal-update",
"version": "0.1.0",
"private": true,
"scripts": {
"build": "tsc --build",
"build.watch": "tsc --build --watch",
"lint": "eslint .",
"start": "ts-node src/worker.ts",
"start.watch": "nodemon src/worker.ts",
"simulation": "ts-node src/run-simulation.ts",
"format": "prettier --config .prettierrc 'src/**/*.ts' --write",
"test": "mocha --exit --require ts-node/register src/test/*.test.ts"
},
"nodemonConfig": {
"execMap": {
"ts": "ts-node"
},
"ext": "ts",
"watch": [
"src"
]
},
"dependencies": {
"@temporalio/activity": "^1.11.1",
"@temporalio/client": "^1.11.1",
"@temporalio/worker": "^1.11.1",
"@temporalio/workflow": "^1.11.1",
"async-mutex": "^0.5.0",
"nanoid": "3.x"
},
"devDependencies": {
"@temporalio/testing": "^1.11.1",
"@tsconfig/node16": "^1.0.0",
"@types/mocha": "8.x",
"@types/node": "^16.11.43",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"eslint": "^7.32.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-deprecation": "^1.2.1",
"mocha": "8.x",
"nodemon": "^2.0.12",
"prettier": "^2.8.8",
"ts-node": "^10.8.1",
"typescript": "^4.4.2"
}
}
26 changes: 26 additions & 0 deletions updates_and_signals/safe_message_handlers/src/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import * as activities from '@temporalio/activity';

interface AssignNodesToJobInput {
nodes: string[];
jobName: string;
}

interface UnassignNodesForJobInput {
nodes: string[];
jobName: string;
}

export async function startCluster(): Promise<void> {
activities.log.info('Starting cluster');
await activities.sleep(100); // Simulate RPC
}

export async function assignNodesToJob(input: AssignNodesToJobInput): Promise<void> {
activities.log.info(`Assigning nodes ${input.nodes} to job ${input.jobName}`);
await activities.sleep(100); // Simulate RPC
}

export async function unassignNodesForJob(input: UnassignNodesForJobInput): Promise<void> {
activities.log.info(`Unassigning nodes ${input.nodes} from job ${input.jobName}`);
await activities.sleep(100); // Simulate RPC
}
13 changes: 13 additions & 0 deletions updates_and_signals/safe_message_handlers/src/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Connection, Client, WorkflowHandle } from '@temporalio/client';
import { nanoid } from 'nanoid';

import { clusterManagerWorkflow } from './workflows';

export async function startClusterManager(): Promise<WorkflowHandle<typeof clusterManagerWorkflow>> {
const connection = await Connection.connect({ address: 'localhost:7233' });
const client = new Client({ connection });
return client.workflow.start(clusterManagerWorkflow, {
taskQueue: 'safe-message-handlers-task-queue',
workflowId: `cm-${nanoid()}`,
});
}
137 changes: 137 additions & 0 deletions updates_and_signals/safe_message_handlers/src/cluster-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import * as wf from '@temporalio/workflow';
import type * as activities from './activities';
import { Mutex } from 'async-mutex';
import {
AssignNodesToJobUpdateInput,
ClusterManagerState,
ClusterManagerStateSummary,
DeleteJobUpdateInput,
} from './types';

const { assignNodesToJob, unassignNodesForJob, startCluster } = wf.proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});

// ClusterManagerWorkflow keeps track of the job assignments of a cluster of nodes. It exposes an
// API to started and shutdown the cluster, to assign jobs to nodes, to delete jobs, and to query
// cluster status. The workflow maps this API to Signals, Updates, and Queries. The assign and
// delete operations issue an RPC changing the state of the remote cluster, and then mutate workflow
// state reflecting the change made. In order that workflow state remains in sync with the true
// cluster state, assign/delete operations must not be performed concurrently (i.e. they must not
// "interleave" with each other; they must be "serialized"; they must be "atomic"). An async mutex
// from a 3rd party library is used to ensure this.
export class ClusterManager {
state: ClusterManagerState;
seenJobs: Set<string>;
nodesMutex: Mutex;

constructor(state?: ClusterManagerState) {
this.state = state ?? {
clusterStarted: false,
clusterShutdown: false,
nodes: new Map<string, string | null>(),
maxAssignedNodes: 0,
};
this.nodesMutex = new Mutex();
this.seenJobs = new Set<string>();
}

async startCluster(): Promise<void> {
await startCluster();
this.state.clusterStarted = true;
for (let i = 0; i < 25; i++) {
this.state.nodes.set(i.toString(), null);
}
wf.log.info('Cluster started');
}

async shutDownCluster(): Promise<void> {
await wf.condition(() => this.state.clusterStarted);
this.state.clusterShutdown = true;
wf.log.info('Cluster shutdown');
}

async assignNodesToJob(input: AssignNodesToJobUpdateInput): Promise<ClusterManagerStateSummary> {
await wf.condition(() => this.state.clusterStarted);
if (this.state.clusterShutdown) {
// If you want the client to receive a failure, either add an update validator and throw the
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
// cause the workflow to keep retrying and get it stuck.
throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is already shut down');
}
return await this.nodesMutex.runExclusive(async (): Promise<ClusterManagerStateSummary> => {
// Idempotency guard: do nothing if the job already has nodes assigned.
if (!this.seenJobs.has(input.jobName)) {
const unassignedNodes = this.getUnassignedNodes();
if (input.numNodes > unassignedNodes.size) {
throw new wf.ApplicationFailure(
`Cannot assign ${input.numNodes} nodes; have only ${unassignedNodes.size} available`
);
}
const nodesToAssign = Array.from(unassignedNodes).slice(0, input.numNodes);
// This await would be dangerous without the lock held because it would allow interleaving
// with the deleteJob operation, which mutates self.state.nodes.
await assignNodesToJob({ nodes: nodesToAssign, jobName: input.jobName });
for (const node of nodesToAssign) {
this.state.nodes.set(node, input.jobName);
}
this.seenJobs.add(input.jobName);
this.state.maxAssignedNodes = Math.max(this.state.maxAssignedNodes, this.getAssignedNodes().size);
}
return this.getStateSummary();
});
}

async deleteJob(input: DeleteJobUpdateInput) {
await wf.condition(() => this.state.clusterStarted);
if (this.state.clusterShutdown) {
// If you want the client to receive a failure, either add an update validator and throw the
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
// cause the workflow to keep retrying and get it stuck.
throw new wf.ApplicationFailure('Cannot delete job: Cluster is already shut down');
}
await this.nodesMutex.runExclusive(async () => {
const nodesToUnassign = Array.from(this.state.nodes.entries())
.filter(([_, v]) => v === input.jobName)
.map(([k, _]) => k);
// This await would be dangerous without the lock held because it would allow interleaving
// with the assignNodesToJob operation, which mutates self.state.nodes.
await unassignNodesForJob({ nodes: nodesToUnassign, jobName: input.jobName });
for (const node of nodesToUnassign) {
this.state.nodes.set(node, null);
}
});
}

getState(): ClusterManagerState {
return {
clusterStarted: this.state.clusterStarted,
clusterShutdown: this.state.clusterShutdown,
nodes: this.state.nodes,
maxAssignedNodes: this.state.maxAssignedNodes,
};
}

getStateSummary(): ClusterManagerStateSummary {
return {
maxAssignedNodes: this.state.maxAssignedNodes,
assignedNodes: this.getAssignedNodes().size,
};
}

getUnassignedNodes(): Set<string> {
return new Set(Array.from(this.state.nodes.keys()).filter((key) => this.state.nodes.get(key) === null));
}

getAssignedNodes(jobName?: string): Set<string> {
return new Set(
Array.from(this.state.nodes.keys()).filter((key) => {
const value = this.state.nodes.get(key);
if (jobName === undefined) {
return value !== null && value !== 'BAD!';
}
return value === jobName;
})
);
}
}
37 changes: 37 additions & 0 deletions updates_and_signals/safe_message_handlers/src/run-simulation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { WorkflowHandle } from '@temporalio/client';

import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterSignal } from './workflows';
import { startClusterManager } from './client';
import { setTimeout } from 'timers/promises';

async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise<void> {
await wf.signal(startClusterSignal);

const allocationUpdates: Promise<any>[] = [];
for (let i = 0; i < 6; i++) {
allocationUpdates.push(wf.executeUpdate(assignNodesToJobUpdate, { args: [{ numNodes: 2, jobName: `task-${i}` }] }));
}
await Promise.all(allocationUpdates);

if (delaySeconds) {
await setTimeout(delaySeconds * 1000);
}

const deletionUpdates: Promise<any>[] = [];
for (let i = 0; i < 6; i++) {
deletionUpdates.push(wf.executeUpdate(deleteJobUpdate, { args: [{ jobName: `task-${i}` }] }));
}
await Promise.all(deletionUpdates);

await wf.signal(shutdownClusterSignal);
}

async function main() {
const workflow = await startClusterManager();
await runSimulation(workflow);
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
Loading
Loading