diff --git a/.husky/pre-commit b/.husky/pre-push similarity index 100% rename from .husky/pre-commit rename to .husky/pre-push diff --git a/.scripts/list-of-samples.json b/.scripts/list-of-samples.json index bf1f908b..157e8db1 100644 --- a/.scripts/list-of-samples.json +++ b/.scripts/list-of-samples.json @@ -38,6 +38,7 @@ "timer-examples", "timer-progress", "update", + "updates_and_signals", "vscode-debugger", "worker-specific-task-queues", "worker-versioning" diff --git a/updates_and_signals/safe_message_handlers/.eslintignore b/updates_and_signals/safe_message_handlers/.eslintignore new file mode 100644 index 00000000..7bd99a41 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.eslintignore @@ -0,0 +1,3 @@ +node_modules +lib +.eslintrc.js \ No newline at end of file diff --git a/updates_and_signals/safe_message_handlers/.eslintrc.js b/updates_and_signals/safe_message_handlers/.eslintrc.js new file mode 100644 index 00000000..b8251a06 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.eslintrc.js @@ -0,0 +1,48 @@ +const { builtinModules } = require('module'); + +const ALLOWED_NODE_BUILTINS = new Set(['assert']); + +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + parserOptions: { + project: './tsconfig.json', + tsconfigRootDir: __dirname, + }, + plugins: ['@typescript-eslint', 'deprecation'], + extends: [ + 'eslint:recommended', + 'plugin:@typescript-eslint/eslint-recommended', + 'plugin:@typescript-eslint/recommended', + 'prettier', + ], + rules: { + // recommended for safety + '@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad + 'deprecation/deprecation': 'warn', + + // code style preference + 'object-shorthand': ['error', 'always'], + + // relaxed rules, for convenience + '@typescript-eslint/no-unused-vars': [ + 'warn', + { + argsIgnorePattern: '^_', + varsIgnorePattern: '^_', + }, + ], + '@typescript-eslint/no-explicit-any': 'off', + }, + overrides: [ + { + files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'], + rules: { + 'no-restricted-imports': [ + 'error', + ...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]), + ], + }, + }, + ], +}; diff --git a/updates_and_signals/safe_message_handlers/.gitignore b/updates_and_signals/safe_message_handlers/.gitignore new file mode 100644 index 00000000..a9f4ed54 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.gitignore @@ -0,0 +1,2 @@ +lib +node_modules \ No newline at end of file diff --git a/updates_and_signals/safe_message_handlers/.npmrc b/updates_and_signals/safe_message_handlers/.npmrc new file mode 100644 index 00000000..9cf94950 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.npmrc @@ -0,0 +1 @@ +package-lock=false \ No newline at end of file diff --git a/updates_and_signals/safe_message_handlers/.nvmrc b/updates_and_signals/safe_message_handlers/.nvmrc new file mode 100644 index 00000000..b6a7d89c --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.nvmrc @@ -0,0 +1 @@ +16 diff --git a/updates_and_signals/safe_message_handlers/.post-create b/updates_and_signals/safe_message_handlers/.post-create new file mode 100644 index 00000000..a682bb78 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.post-create @@ -0,0 +1,18 @@ +To begin development, install the Temporal CLI: + + Mac: {cyan brew install temporal} + Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest + +Start Temporal Server: + + {cyan temporal server start-dev} + +Use Node version 16+: + + Mac: {cyan brew install node@16} + Other: https://nodejs.org/en/download/ + +Then, in the project directory, using two other shells, run these commands: + + {cyan npm run start.watch} + {cyan npm run workflow} diff --git a/updates_and_signals/safe_message_handlers/.prettierignore b/updates_and_signals/safe_message_handlers/.prettierignore new file mode 100644 index 00000000..7951405f --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.prettierignore @@ -0,0 +1 @@ +lib \ No newline at end of file diff --git a/updates_and_signals/safe_message_handlers/.prettierrc b/updates_and_signals/safe_message_handlers/.prettierrc new file mode 100644 index 00000000..965d50bf --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.prettierrc @@ -0,0 +1,2 @@ +printWidth: 120 +singleQuote: true diff --git a/updates_and_signals/safe_message_handlers/README.md b/updates_and_signals/safe_message_handlers/README.md new file mode 100644 index 00000000..521f5f3d --- /dev/null +++ b/updates_and_signals/safe_message_handlers/README.md @@ -0,0 +1,6 @@ +### Running this sample + +1. `npm install` to install dependencies. +1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation). +1. `npm run start.watch` to start the Worker. +1. In another shell, `npm run simulation` to run a simulation of the cluster manager. diff --git a/updates_and_signals/safe_message_handlers/package.json b/updates_and_signals/safe_message_handlers/package.json new file mode 100644 index 00000000..443bbf93 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/package.json @@ -0,0 +1,48 @@ +{ + "name": "temporal-update", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "tsc --build", + "build.watch": "tsc --build --watch", + "lint": "eslint .", + "start": "ts-node src/worker.ts", + "start.watch": "nodemon src/worker.ts", + "simulation": "ts-node src/run-simulation.ts", + "format": "prettier --config .prettierrc 'src/**/*.ts' --write", + "test": "mocha --exit --require ts-node/register src/test/*.test.ts" + }, + "nodemonConfig": { + "execMap": { + "ts": "ts-node" + }, + "ext": "ts", + "watch": [ + "src" + ] + }, + "dependencies": { + "@temporalio/activity": "^1.11.1", + "@temporalio/client": "^1.11.1", + "@temporalio/worker": "^1.11.1", + "@temporalio/workflow": "^1.11.1", + "async-mutex": "^0.5.0", + "nanoid": "3.x" + }, + "devDependencies": { + "@temporalio/testing": "^1.11.1", + "@tsconfig/node16": "^1.0.0", + "@types/mocha": "8.x", + "@types/node": "^16.11.43", + "@typescript-eslint/eslint-plugin": "^5.0.0", + "@typescript-eslint/parser": "^5.0.0", + "eslint": "^7.32.0", + "eslint-config-prettier": "^8.3.0", + "eslint-plugin-deprecation": "^1.2.1", + "mocha": "8.x", + "nodemon": "^2.0.12", + "prettier": "^2.8.8", + "ts-node": "^10.8.1", + "typescript": "^4.4.2" + } +} diff --git a/updates_and_signals/safe_message_handlers/src/activities.ts b/updates_and_signals/safe_message_handlers/src/activities.ts new file mode 100644 index 00000000..1e2399f0 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/activities.ts @@ -0,0 +1,26 @@ +import * as activities from '@temporalio/activity'; + +interface AssignNodesToJobInput { + nodes: string[]; + jobName: string; +} + +interface UnassignNodesForJobInput { + nodes: string[]; + jobName: string; +} + +export async function startCluster(): Promise { + activities.log.info('Starting cluster'); + await activities.sleep(100); // Simulate RPC +} + +export async function assignNodesToJob(input: AssignNodesToJobInput): Promise { + activities.log.info(`Assigning nodes ${input.nodes} to job ${input.jobName}`); + await activities.sleep(100); // Simulate RPC +} + +export async function unassignNodesForJob(input: UnassignNodesForJobInput): Promise { + activities.log.info(`Unassigning nodes ${input.nodes} from job ${input.jobName}`); + await activities.sleep(100); // Simulate RPC +} diff --git a/updates_and_signals/safe_message_handlers/src/client.ts b/updates_and_signals/safe_message_handlers/src/client.ts new file mode 100644 index 00000000..a42b3965 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/client.ts @@ -0,0 +1,13 @@ +import { Connection, Client, WorkflowHandle } from '@temporalio/client'; +import { nanoid } from 'nanoid'; + +import { clusterManagerWorkflow } from './workflows'; + +export async function startClusterManager(): Promise> { + const connection = await Connection.connect({ address: 'localhost:7233' }); + const client = new Client({ connection }); + return client.workflow.start(clusterManagerWorkflow, { + taskQueue: 'safe-message-handlers-task-queue', + workflowId: `cm-${nanoid()}`, + }); +} diff --git a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts new file mode 100644 index 00000000..3af5616f --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts @@ -0,0 +1,137 @@ +import * as wf from '@temporalio/workflow'; +import type * as activities from './activities'; +import { Mutex } from 'async-mutex'; +import { + AssignNodesToJobUpdateInput, + ClusterManagerState, + ClusterManagerStateSummary, + DeleteJobUpdateInput, +} from './types'; + +const { assignNodesToJob, unassignNodesForJob, startCluster } = wf.proxyActivities({ + startToCloseTimeout: '1 minute', +}); + +// ClusterManagerWorkflow keeps track of the job assignments of a cluster of nodes. It exposes an +// API to started and shutdown the cluster, to assign jobs to nodes, to delete jobs, and to query +// cluster status. The workflow maps this API to Signals, Updates, and Queries. The assign and +// delete operations issue an RPC changing the state of the remote cluster, and then mutate workflow +// state reflecting the change made. In order that workflow state remains in sync with the true +// cluster state, assign/delete operations must not be performed concurrently (i.e. they must not +// "interleave" with each other; they must be "serialized"; they must be "atomic"). An async mutex +// from a 3rd party library is used to ensure this. +export class ClusterManager { + state: ClusterManagerState; + seenJobs: Set; + nodesMutex: Mutex; + + constructor(state?: ClusterManagerState) { + this.state = state ?? { + clusterStarted: false, + clusterShutdown: false, + nodes: new Map(), + maxAssignedNodes: 0, + }; + this.nodesMutex = new Mutex(); + this.seenJobs = new Set(); + } + + async startCluster(): Promise { + await startCluster(); + this.state.clusterStarted = true; + for (let i = 0; i < 25; i++) { + this.state.nodes.set(i.toString(), null); + } + wf.log.info('Cluster started'); + } + + async shutDownCluster(): Promise { + await wf.condition(() => this.state.clusterStarted); + this.state.clusterShutdown = true; + wf.log.info('Cluster shutdown'); + } + + async assignNodesToJob(input: AssignNodesToJobUpdateInput): Promise { + await wf.condition(() => this.state.clusterStarted); + if (this.state.clusterShutdown) { + // If you want the client to receive a failure, either add an update validator and throw the + // exception from there, or raise an ApplicationError. Other exceptions in the handler will + // cause the workflow to keep retrying and get it stuck. + throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is already shut down'); + } + return await this.nodesMutex.runExclusive(async (): Promise => { + // Idempotency guard: do nothing if the job already has nodes assigned. + if (!this.seenJobs.has(input.jobName)) { + const unassignedNodes = this.getUnassignedNodes(); + if (input.numNodes > unassignedNodes.size) { + throw new wf.ApplicationFailure( + `Cannot assign ${input.numNodes} nodes; have only ${unassignedNodes.size} available` + ); + } + const nodesToAssign = Array.from(unassignedNodes).slice(0, input.numNodes); + // This await would be dangerous without the lock held because it would allow interleaving + // with the deleteJob operation, which mutates self.state.nodes. + await assignNodesToJob({ nodes: nodesToAssign, jobName: input.jobName }); + for (const node of nodesToAssign) { + this.state.nodes.set(node, input.jobName); + } + this.seenJobs.add(input.jobName); + this.state.maxAssignedNodes = Math.max(this.state.maxAssignedNodes, this.getAssignedNodes().size); + } + return this.getStateSummary(); + }); + } + + async deleteJob(input: DeleteJobUpdateInput) { + await wf.condition(() => this.state.clusterStarted); + if (this.state.clusterShutdown) { + // If you want the client to receive a failure, either add an update validator and throw the + // exception from there, or raise an ApplicationError. Other exceptions in the handler will + // cause the workflow to keep retrying and get it stuck. + throw new wf.ApplicationFailure('Cannot delete job: Cluster is already shut down'); + } + await this.nodesMutex.runExclusive(async () => { + const nodesToUnassign = Array.from(this.state.nodes.entries()) + .filter(([_, v]) => v === input.jobName) + .map(([k, _]) => k); + // This await would be dangerous without the lock held because it would allow interleaving + // with the assignNodesToJob operation, which mutates self.state.nodes. + await unassignNodesForJob({ nodes: nodesToUnassign, jobName: input.jobName }); + for (const node of nodesToUnassign) { + this.state.nodes.set(node, null); + } + }); + } + + getState(): ClusterManagerState { + return { + clusterStarted: this.state.clusterStarted, + clusterShutdown: this.state.clusterShutdown, + nodes: this.state.nodes, + maxAssignedNodes: this.state.maxAssignedNodes, + }; + } + + getStateSummary(): ClusterManagerStateSummary { + return { + maxAssignedNodes: this.state.maxAssignedNodes, + assignedNodes: this.getAssignedNodes().size, + }; + } + + getUnassignedNodes(): Set { + return new Set(Array.from(this.state.nodes.keys()).filter((key) => this.state.nodes.get(key) === null)); + } + + getAssignedNodes(jobName?: string): Set { + return new Set( + Array.from(this.state.nodes.keys()).filter((key) => { + const value = this.state.nodes.get(key); + if (jobName === undefined) { + return value !== null && value !== 'BAD!'; + } + return value === jobName; + }) + ); + } +} diff --git a/updates_and_signals/safe_message_handlers/src/run-simulation.ts b/updates_and_signals/safe_message_handlers/src/run-simulation.ts new file mode 100644 index 00000000..9096c888 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/run-simulation.ts @@ -0,0 +1,37 @@ +import { WorkflowHandle } from '@temporalio/client'; + +import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterSignal } from './workflows'; +import { startClusterManager } from './client'; +import { setTimeout } from 'timers/promises'; + +async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise { + await wf.signal(startClusterSignal); + + const allocationUpdates: Promise[] = []; + for (let i = 0; i < 6; i++) { + allocationUpdates.push(wf.executeUpdate(assignNodesToJobUpdate, { args: [{ numNodes: 2, jobName: `task-${i}` }] })); + } + await Promise.all(allocationUpdates); + + if (delaySeconds) { + await setTimeout(delaySeconds * 1000); + } + + const deletionUpdates: Promise[] = []; + for (let i = 0; i < 6; i++) { + deletionUpdates.push(wf.executeUpdate(deleteJobUpdate, { args: [{ jobName: `task-${i}` }] })); + } + await Promise.all(deletionUpdates); + + await wf.signal(shutdownClusterSignal); +} + +async function main() { + const workflow = await startClusterManager(); + await runSimulation(workflow); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts b/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts new file mode 100644 index 00000000..383328a1 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts @@ -0,0 +1,89 @@ +import { TestWorkflowEnvironment } from '@temporalio/testing'; +import { before, describe, it } from 'mocha'; +import { bundleWorkflowCode, WorkflowBundleWithSourceMap, DefaultLogger, Runtime, Worker } from '@temporalio/worker'; +import * as activities from '../activities'; +import { + clusterManagerWorkflow, + assignNodesToJobUpdate, + startClusterSignal, + shutdownClusterSignal, + deleteJobUpdate, + getClusterStatusQuery, +} from '../workflows'; +import { nanoid } from 'nanoid'; +import assert from 'assert'; + +const taskQueue = 'test' + new Date().toLocaleDateString('en-US'); + +describe('cluster manager', function () { + this.timeout(10000); + let worker: Worker; + let env: TestWorkflowEnvironment; + let workflowBundle: WorkflowBundleWithSourceMap; + + before(async function () { + Runtime.install({ logger: new DefaultLogger('WARN') }); + env = await TestWorkflowEnvironment.createLocal(); + + workflowBundle = await bundleWorkflowCode({ + workflowsPath: require.resolve('../workflows'), + logger: new DefaultLogger('WARN'), + }); + }); + + beforeEach(async function () { + worker = await Worker.create({ + connection: env.nativeConnection, + workflowBundle, + activities, + taskQueue, + }); + }); + + after(async function () { + await env.teardown(); + }); + + it('successfully completes a session', async function () { + await worker.runUntil(async function () { + const workflow = await env.client.workflow.start(clusterManagerWorkflow, { + taskQueue, + workflowId: `cluster-manager-${nanoid()}`, + }); + await workflow.signal(startClusterSignal); + const request1 = { + numNodes: 5, + jobName: 'job1', + }; + // Use an update to assign nodes. + const updateResult1 = await workflow.executeUpdate(assignNodesToJobUpdate, { + args: [request1], + }); + assert.equal(updateResult1.assignedNodes, request1.numNodes); + assert.equal(updateResult1.maxAssignedNodes, request1.numNodes); + // Assign nodes to a job and then delete it + const request2 = { + numNodes: 6, + jobName: 'job2', + }; + const updateResult2 = await workflow.executeUpdate(assignNodesToJobUpdate, { + args: [request2], + }); + assert.equal(updateResult2.assignedNodes, request1.numNodes + request2.numNodes); + assert.equal(updateResult2.maxAssignedNodes, request1.numNodes + request2.numNodes); + await workflow.executeUpdate(deleteJobUpdate, { args: [{ jobName: 'job2' }] }); + // The delete doesn't return anything; use the query to get current cluster state + const queryResult = await workflow.query(getClusterStatusQuery); + assert.equal( + queryResult.assignedNodes, + request1.numNodes, + `expected ${request1.numNodes} left after deleting ${request2.numNodes}` + ); + assert.equal(queryResult.maxAssignedNodes, request1.numNodes + request2.numNodes); + // Terminate the workflow and check that workflow returns same value as obtained from last query. + await workflow.signal(shutdownClusterSignal); + const wfResult = await workflow.result(); + assert.deepEqual(wfResult, queryResult); + }); + }); +}); diff --git a/updates_and_signals/safe_message_handlers/src/types.ts b/updates_and_signals/safe_message_handlers/src/types.ts new file mode 100644 index 00000000..6454944b --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/types.ts @@ -0,0 +1,30 @@ +export interface ClusterManagerState { + clusterStarted: boolean; + clusterShutdown: boolean; + nodes: Map; + maxAssignedNodes: number; +} + +export interface ClusterManagerInput { + state?: ClusterManagerState; +} + +export interface ClusterManagerStateSummary { + maxAssignedNodes: number; + assignedNodes: number; +} + +export interface AssignNodesToJobUpdateInput { + numNodes: number; + jobName: string; +} + +export interface DeleteJobUpdateInput { + jobName: string; +} + +export interface ClusterManagerWorkflowResult { + maxAssignedNodes: number; + numCurrentlyAssignedNodes: number; + numBadNodes: number; +} diff --git a/updates_and_signals/safe_message_handlers/src/worker.ts b/updates_and_signals/safe_message_handlers/src/worker.ts new file mode 100644 index 00000000..e65d1e56 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/worker.ts @@ -0,0 +1,18 @@ +import { Worker } from '@temporalio/worker'; +import path from 'path'; +import * as activities from './activities'; + +async function run() { + const worker = await Worker.create({ + workflowsPath: path.join(__dirname, './workflows.ts'), + activities, + taskQueue: 'safe-message-handlers-task-queue', + }); + + await worker.run(); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts new file mode 100644 index 00000000..5605f3ae --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -0,0 +1,65 @@ +import * as wf from '@temporalio/workflow'; +import { ClusterManager } from './cluster-manager'; +import { + AssignNodesToJobUpdateInput, + ClusterManagerInput, + ClusterManagerStateSummary, + DeleteJobUpdateInput, +} from './types'; + +export const startClusterSignal = wf.defineSignal('startCluster'); +export const shutdownClusterSignal = wf.defineSignal('shutdownCluster'); +export const assignNodesToJobUpdate = wf.defineUpdate( + 'allocateNodesToJob' +); +export const deleteJobUpdate = wf.defineUpdate('deleteJob'); +export const getClusterStatusQuery = wf.defineQuery('getClusterStatus'); + +export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): Promise { + const manager = new ClusterManager(input.state); + // + // Message-handling API + // + wf.setHandler(startClusterSignal, () => manager.startCluster()); + wf.setHandler(shutdownClusterSignal, () => manager.shutDownCluster()); + + // This is an update as opposed to a signal because the client may want to wait for nodes to be + // allocated before sending work to those nodes. Returns the array of node names that were + // allocated to the job. + wf.setHandler(assignNodesToJobUpdate, (input) => manager.assignNodesToJob(input), { + validator: async (input: AssignNodesToJobUpdateInput): Promise => { + if (input.numNodes <= 0) { + throw new Error(`numNodes must be positive (got ${input.numNodes})`); + } + if (input.jobName === '') { + throw new Error('jobName cannot be empty'); + } + }, + }); + + // Even though it returns nothing, this is an update because the client may want to track it, for + // example to wait for nodes to be unassigned before reassigning them. + wf.setHandler(deleteJobUpdate, (input) => manager.deleteJob(input)); + wf.setHandler(getClusterStatusQuery, () => manager.getStateSummary()); + + // + // Main workflow logic + // + // The cluster manager workflow is a long-running workflow ("entity" workflow). Most of its logic + // lies in the message-processing handlers implented in the ClusterManager class. The main + // workflow itself simply waits until the cluster is shutdown, or the workflow needs to + // continue-as-new. + await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested); + if (!manager.state.clusterShutdown) { + // You should typically wait for all async handlers to finish before + // completing a workflow or continuing as new. If the main workflow method + // is scheduling activities or child workflows, then you should typically + // also arrange that they are completed before completing or continuing as + // new. This sample does not schedule any activities or child workflows, so + // it is sufficient just to wait for handlers to finish. + await wf.condition(wf.allHandlersFinished); + return await wf.continueAsNew({ state: manager.getState() }); + } else { + return manager.getStateSummary(); + } +} diff --git a/updates_and_signals/tsconfig.json b/updates_and_signals/tsconfig.json new file mode 100644 index 00000000..6ff187f6 --- /dev/null +++ b/updates_and_signals/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "@tsconfig/node16/tsconfig.json", + "version": "4.4.2", + "compilerOptions": { + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src/**/*.ts"] +}