From cf47d7c02fbdbca8a18a8d2f5c7f346fa6f94a6e Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 1 Jul 2024 05:42:15 -0400 Subject: [PATCH 01/21] Do husky actions pre-push instead of pre-commit --- .husky/{pre-commit => pre-push} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .husky/{pre-commit => pre-push} (100%) diff --git a/.husky/pre-commit b/.husky/pre-push similarity index 100% rename from .husky/pre-commit rename to .husky/pre-push From acf8c1c818263b606c99a1609fb9a1b2afe1bca7 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 2 Jul 2024 17:25:40 -0400 Subject: [PATCH 02/21] Copilot Workspace experiment: ask it to translate https://github.com/temporalio/samples-python/pull/123 --- .../safe_message_handlers/src/activities.ts | 33 ++++++ .../safe_message_handlers/src/starter.ts | 20 ++++ .../safe_message_handlers/src/worker.ts | 17 +++ .../safe_message_handlers/src/workflow.ts | 102 ++++++++++++++++++ .../src/workflow_test.ts | 28 +++++ 5 files changed, 200 insertions(+) create mode 100644 updates_and_signals/safe_message_handlers/src/activities.ts create mode 100644 updates_and_signals/safe_message_handlers/src/starter.ts create mode 100644 updates_and_signals/safe_message_handlers/src/worker.ts create mode 100644 updates_and_signals/safe_message_handlers/src/workflow.ts create mode 100644 updates_and_signals/safe_message_handlers/src/workflow_test.ts diff --git a/updates_and_signals/safe_message_handlers/src/activities.ts b/updates_and_signals/safe_message_handlers/src/activities.ts new file mode 100644 index 00000000..497fab4e --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/activities.ts @@ -0,0 +1,33 @@ +import { proxyActivities } from '@temporalio/workflow'; +import { ActivityInput as AllocateNodesToJobInput } from './interfaces'; + +// Activities with TypeScript syntax and Temporal TypeScript SDK specifics +const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities<{ + allocateNodesToJob(input: AllocateNodesToJobInput): Promise; + deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise; + findBadNodes(input: FindBadNodesInput): Promise; +}>({ + startToCloseTimeout: '1 minute', +}); + +export async function allocateNodesToJob(input: AllocateNodesToJobInput): Promise { + console.log(`Assigning nodes ${input.nodes} to job ${input.jobName}`); + await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation +} + +export async function deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise { + console.log(`Deallocating nodes ${input.nodes} from job ${input.jobName}`); + await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation +} + +export async function findBadNodes(input: FindBadNodesInput): Promise { + await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation + const badNodes = input.nodesToCheck.filter(n => parseInt(n) % 5 === 0); + if (badNodes.length) { + console.log(`Found bad nodes: ${badNodes}`); + } else { + console.log("No new bad nodes found."); + } + return badNodes; +} + diff --git a/updates_and_signals/safe_message_handlers/src/starter.ts b/updates_and_signals/safe_message_handlers/src/starter.ts new file mode 100644 index 00000000..21a7b55e --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/starter.ts @@ -0,0 +1,20 @@ +import { WorkflowClient } from '@temporalio/client'; +import { ClusterManagerWorkflow } from './workflow'; +import { doClusterLifecycle } from './utils'; + +async function main() { + const client = new WorkflowClient(); + + // Define the workflow handle + const workflow = client.createWorkflowHandle(ClusterManagerWorkflow, { + workflowId: 'cluster-management-workflow', + }); + + // Start the cluster lifecycle + await doClusterLifecycle(workflow); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/updates_and_signals/safe_message_handlers/src/worker.ts b/updates_and_signals/safe_message_handlers/src/worker.ts new file mode 100644 index 00000000..dc3baa79 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/worker.ts @@ -0,0 +1,17 @@ +import { Worker } from '@temporalio/worker'; +import path from 'path'; + +async function run() { + const worker = await Worker.create({ + workflowsPath: path.join(__dirname, './workflows'), + activitiesPath: path.join(__dirname, './activities'), + taskQueue: 'safe-message-handlers-task-queue', + }); + + await worker.run(); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/updates_and_signals/safe_message_handlers/src/workflow.ts b/updates_and_signals/safe_message_handlers/src/workflow.ts new file mode 100644 index 00000000..ca984b7c --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/workflow.ts @@ -0,0 +1,102 @@ +// This file contains the TypeScript port of the Python workflow for managing cluster updates and signals + +import { proxyActivities, defineSignal, defineQuery, setHandler, condition, sleep, defineWorkflow } from '@temporalio/workflow'; +import type { AllocateNodesToJobInput, DeallocateNodesForJobInput, FindBadNodesInput } from './interfaces'; + +// Define signals +const startClusterSignal = defineSignal('startCluster'); +const shutdownClusterSignal = defineSignal('shutdownCluster'); +const allocateNodesToJobSignal = defineSignal<[AllocateNodesToJobInput]>('allocateNodesToJob'); +const deallocateNodesForJobSignal = defineSignal<[DeallocateNodesForJobInput]>('deallocateNodesForJob'); + +// Define queries +const getClusterStatusQuery = defineQuery<{}>('getClusterStatus'); + +// Define activities +const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities<{ + allocateNodesToJob(input: AllocateNodesToJobInput): Promise; + deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise; + findBadNodes(input: FindBadNodesInput): Promise; +}>({ + startToCloseTimeout: '1 minute', +}); + +// Define workflow interface +export interface ClusterManagerWorkflow { + run(input: ClusterManagerWorkflowInput): Promise; +} + +// Define workflow input and result types +export interface ClusterManagerWorkflowInput { + testContinueAsNew: boolean; +} + +export interface ClusterManagerWorkflowResult { + maxAssignedNodes: number; + numCurrentlyAssignedNodes: number; + numBadNodes: number; +} + +// Workflow implementation +export const clusterManagerWorkflow: ClusterManagerWorkflow = defineWorkflow({ + async run(input: ClusterManagerWorkflowInput) { + let state = { + clusterStarted: false, + clusterShutdown: false, + nodes: {} as Record, + jobsAdded: new Set(), + maxAssignedNodes: 0, + }; + + // Signal handlers + setHandler(startClusterSignal, () => { + state.clusterStarted = true; + for (let i = 0; i < 25; i++) { + state.nodes[i.toString()] = null; + } + }); + + setHandler(shutdownClusterSignal, () => { + state.clusterShutdown = true; + }); + + setHandler(allocateNodesToJobSignal, async (input: AllocateNodesToJobInput) => { + if (!state.clusterStarted || state.clusterShutdown) { + throw new Error('Cluster is not in a valid state for node allocation'); + } + // Allocate nodes to job logic + }); + + setHandler(deallocateNodesForJobSignal, async (input: DeallocateNodesForJobInput) => { + if (!state.clusterStarted || state.clusterShutdown) { + throw new Error('Cluster is not in a valid state for node deallocation'); + } + // Deallocate nodes from job logic + }); + + // Query handler + setHandler(getClusterStatusQuery, () => { + return { + clusterStarted: state.clusterStarted, + clusterShutdown: state.clusterShutdown, + numNodes: Object.keys(state.nodes).length, + numAssignedNodes: Object.values(state.nodes).filter(n => n !== null).length, + }; + }); + + // Main workflow logic + await condition(() => state.clusterStarted, 'Waiting for cluster to start'); + // Perform operations while cluster is active + while (!state.clusterShutdown) { + // Example: perform periodic health checks + await sleep(60000); // Sleep for 60 seconds + } + + // Return workflow result + return { + maxAssignedNodes: state.maxAssignedNodes, + numCurrentlyAssignedNodes: Object.values(state.nodes).filter(n => n !== null).length, + numBadNodes: Object.values(state.nodes).filter(n => n === 'BAD').length, + }; + }, +}); diff --git a/updates_and_signals/safe_message_handlers/src/workflow_test.ts b/updates_and_signals/safe_message_handlers/src/workflow_test.ts new file mode 100644 index 00000000..6f95198e --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/workflow_test.ts @@ -0,0 +1,28 @@ +import { WorkflowClient } from '@temporalio/client'; +import { ClusterManagerWorkflow } from './workflow'; +import { v4 as uuidv4 } from 'uuid'; + +async function run() { + const client = new WorkflowClient(); + + // Define the workflow handle + const workflow = client.createWorkflowHandle(ClusterManagerWorkflow, { + workflowId: `cluster-management-workflow-${uuidv4()}`, + }); + + // Test workflow functionality + await workflow.start(); + await workflow.signal.startCluster(); + await workflow.executeUpdate('allocateNodesToJob', { + numNodes: 5, + jobName: 'job1', + }); + await workflow.signal.shutdownCluster(); + const result = await workflow.result(); + console.log('Workflow result:', result); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); From 171321b655c8a4586531f7a8bee373d668ea940e Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 2 Jul 2024 17:26:25 -0400 Subject: [PATCH 03/21] boilerplate --- .scripts/list-of-samples.json | 1 + .../safe_message_handlers/.eslintignore | 3 ++ .../safe_message_handlers/.eslintrc.js | 48 +++++++++++++++++++ .../safe_message_handlers/.gitignore | 2 + .../safe_message_handlers/.npmrc | 1 + .../safe_message_handlers/.nvmrc | 1 + .../safe_message_handlers/.post-create | 18 +++++++ .../safe_message_handlers/.prettierignore | 1 + .../safe_message_handlers/.prettierrc | 2 + .../safe_message_handlers/README.md | 6 +++ .../safe_message_handlers/package.json | 47 ++++++++++++++++++ updates_and_signals/tsconfig.json | 12 +++++ 12 files changed, 142 insertions(+) create mode 100644 updates_and_signals/safe_message_handlers/.eslintignore create mode 100644 updates_and_signals/safe_message_handlers/.eslintrc.js create mode 100644 updates_and_signals/safe_message_handlers/.gitignore create mode 100644 updates_and_signals/safe_message_handlers/.npmrc create mode 100644 updates_and_signals/safe_message_handlers/.nvmrc create mode 100644 updates_and_signals/safe_message_handlers/.post-create create mode 100644 updates_and_signals/safe_message_handlers/.prettierignore create mode 100644 updates_and_signals/safe_message_handlers/.prettierrc create mode 100644 updates_and_signals/safe_message_handlers/README.md create mode 100644 updates_and_signals/safe_message_handlers/package.json create mode 100644 updates_and_signals/tsconfig.json diff --git a/.scripts/list-of-samples.json b/.scripts/list-of-samples.json index bf1f908b..157e8db1 100644 --- a/.scripts/list-of-samples.json +++ b/.scripts/list-of-samples.json @@ -38,6 +38,7 @@ "timer-examples", "timer-progress", "update", + "updates_and_signals", "vscode-debugger", "worker-specific-task-queues", "worker-versioning" diff --git a/updates_and_signals/safe_message_handlers/.eslintignore b/updates_and_signals/safe_message_handlers/.eslintignore new file mode 100644 index 00000000..7bd99a41 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.eslintignore @@ -0,0 +1,3 @@ +node_modules +lib +.eslintrc.js \ No newline at end of file diff --git a/updates_and_signals/safe_message_handlers/.eslintrc.js b/updates_and_signals/safe_message_handlers/.eslintrc.js new file mode 100644 index 00000000..b8251a06 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.eslintrc.js @@ -0,0 +1,48 @@ +const { builtinModules } = require('module'); + +const ALLOWED_NODE_BUILTINS = new Set(['assert']); + +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + parserOptions: { + project: './tsconfig.json', + tsconfigRootDir: __dirname, + }, + plugins: ['@typescript-eslint', 'deprecation'], + extends: [ + 'eslint:recommended', + 'plugin:@typescript-eslint/eslint-recommended', + 'plugin:@typescript-eslint/recommended', + 'prettier', + ], + rules: { + // recommended for safety + '@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad + 'deprecation/deprecation': 'warn', + + // code style preference + 'object-shorthand': ['error', 'always'], + + // relaxed rules, for convenience + '@typescript-eslint/no-unused-vars': [ + 'warn', + { + argsIgnorePattern: '^_', + varsIgnorePattern: '^_', + }, + ], + '@typescript-eslint/no-explicit-any': 'off', + }, + overrides: [ + { + files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'], + rules: { + 'no-restricted-imports': [ + 'error', + ...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]), + ], + }, + }, + ], +}; diff --git a/updates_and_signals/safe_message_handlers/.gitignore b/updates_and_signals/safe_message_handlers/.gitignore new file mode 100644 index 00000000..a9f4ed54 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.gitignore @@ -0,0 +1,2 @@ +lib +node_modules \ No newline at end of file diff --git a/updates_and_signals/safe_message_handlers/.npmrc b/updates_and_signals/safe_message_handlers/.npmrc new file mode 100644 index 00000000..9cf94950 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.npmrc @@ -0,0 +1 @@ +package-lock=false \ No newline at end of file diff --git a/updates_and_signals/safe_message_handlers/.nvmrc b/updates_and_signals/safe_message_handlers/.nvmrc new file mode 100644 index 00000000..b6a7d89c --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.nvmrc @@ -0,0 +1 @@ +16 diff --git a/updates_and_signals/safe_message_handlers/.post-create b/updates_and_signals/safe_message_handlers/.post-create new file mode 100644 index 00000000..a682bb78 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.post-create @@ -0,0 +1,18 @@ +To begin development, install the Temporal CLI: + + Mac: {cyan brew install temporal} + Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest + +Start Temporal Server: + + {cyan temporal server start-dev} + +Use Node version 16+: + + Mac: {cyan brew install node@16} + Other: https://nodejs.org/en/download/ + +Then, in the project directory, using two other shells, run these commands: + + {cyan npm run start.watch} + {cyan npm run workflow} diff --git a/updates_and_signals/safe_message_handlers/.prettierignore b/updates_and_signals/safe_message_handlers/.prettierignore new file mode 100644 index 00000000..7951405f --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.prettierignore @@ -0,0 +1 @@ +lib \ No newline at end of file diff --git a/updates_and_signals/safe_message_handlers/.prettierrc b/updates_and_signals/safe_message_handlers/.prettierrc new file mode 100644 index 00000000..965d50bf --- /dev/null +++ b/updates_and_signals/safe_message_handlers/.prettierrc @@ -0,0 +1,2 @@ +printWidth: 120 +singleQuote: true diff --git a/updates_and_signals/safe_message_handlers/README.md b/updates_and_signals/safe_message_handlers/README.md new file mode 100644 index 00000000..3b57f1e8 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/README.md @@ -0,0 +1,6 @@ +### Running this sample + +1. `npm install` to install dependencies. +1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation). +1. `npm run start.watch` to start the Worker. +1. In another shell, `npm run workflow` to run the Workflow Client. diff --git a/updates_and_signals/safe_message_handlers/package.json b/updates_and_signals/safe_message_handlers/package.json new file mode 100644 index 00000000..b0568354 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/package.json @@ -0,0 +1,47 @@ +{ + "name": "temporal-update", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "tsc --build", + "build.watch": "tsc --build --watch", + "lint": "eslint .", + "start": "ts-node src/worker.ts", + "start.watch": "nodemon src/worker.ts", + "workflow": "ts-node src/client.ts", + "format": "prettier --config .prettierrc 'src/**/*.ts' --write", + "test": "mocha --exit --require ts-node/register --require source-map-support/register src/mocha/*.test.ts" + }, + "nodemonConfig": { + "execMap": { + "ts": "ts-node" + }, + "ext": "ts", + "watch": [ + "src" + ] + }, + "dependencies": { + "@temporalio/activity": "^1.9.0", + "@temporalio/client": "^1.9.0", + "@temporalio/worker": "^1.9.0", + "@temporalio/workflow": "^1.9.0", + "nanoid": "3.x" + }, + "devDependencies": { + "@temporalio/testing": "^1.9.0", + "@tsconfig/node16": "^1.0.0", + "@types/mocha": "8.x", + "@types/node": "^16.11.43", + "@typescript-eslint/eslint-plugin": "^5.0.0", + "@typescript-eslint/parser": "^5.0.0", + "eslint": "^7.32.0", + "eslint-config-prettier": "^8.3.0", + "eslint-plugin-deprecation": "^1.2.1", + "mocha": "8.x", + "nodemon": "^2.0.12", + "prettier": "^2.8.8", + "ts-node": "^10.8.1", + "typescript": "^4.4.2" + } +} diff --git a/updates_and_signals/tsconfig.json b/updates_and_signals/tsconfig.json new file mode 100644 index 00000000..6ff187f6 --- /dev/null +++ b/updates_and_signals/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "@tsconfig/node16/tsconfig.json", + "version": "4.4.2", + "compilerOptions": { + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src/**/*.ts"] +} From 9bab0dd2ff1a91afd6a8c2ce622bb41a274fbfc9 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 1 Jul 2024 05:40:01 -0400 Subject: [PATCH 04/21] Make Copilot Workspace output compile --- .../safe_message_handlers/src/activities.ts | 33 ++--- .../safe_message_handlers/src/client.ts | 45 +++++++ .../safe_message_handlers/src/starter.ts | 20 --- .../safe_message_handlers/src/worker.ts | 3 +- .../safe_message_handlers/src/workflow.ts | 102 -------------- .../safe_message_handlers/src/workflows.ts | 124 ++++++++++++++++++ 6 files changed, 187 insertions(+), 140 deletions(-) create mode 100644 updates_and_signals/safe_message_handlers/src/client.ts delete mode 100644 updates_and_signals/safe_message_handlers/src/starter.ts delete mode 100644 updates_and_signals/safe_message_handlers/src/workflow.ts create mode 100644 updates_and_signals/safe_message_handlers/src/workflows.ts diff --git a/updates_and_signals/safe_message_handlers/src/activities.ts b/updates_and_signals/safe_message_handlers/src/activities.ts index 497fab4e..204b6edf 100644 --- a/updates_and_signals/safe_message_handlers/src/activities.ts +++ b/updates_and_signals/safe_message_handlers/src/activities.ts @@ -1,33 +1,34 @@ -import { proxyActivities } from '@temporalio/workflow'; -import { ActivityInput as AllocateNodesToJobInput } from './interfaces'; +export interface AllocateNodesToJobInput { + nodes: string[]; + jobName: string; +} -// Activities with TypeScript syntax and Temporal TypeScript SDK specifics -const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities<{ - allocateNodesToJob(input: AllocateNodesToJobInput): Promise; - deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise; - findBadNodes(input: FindBadNodesInput): Promise; -}>({ - startToCloseTimeout: '1 minute', -}); +export interface DeallocateNodesForJobInput { + nodes: string[]; + jobName: string; +} + +export interface FindBadNodesInput { + nodesToCheck: string[]; +} export async function allocateNodesToJob(input: AllocateNodesToJobInput): Promise { console.log(`Assigning nodes ${input.nodes} to job ${input.jobName}`); - await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation + await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async operation } export async function deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise { console.log(`Deallocating nodes ${input.nodes} from job ${input.jobName}`); - await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation + await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async operation } export async function findBadNodes(input: FindBadNodesInput): Promise { - await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation - const badNodes = input.nodesToCheck.filter(n => parseInt(n) % 5 === 0); + await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async operation + const badNodes = input.nodesToCheck.filter((n) => parseInt(n) % 5 === 0); if (badNodes.length) { console.log(`Found bad nodes: ${badNodes}`); } else { - console.log("No new bad nodes found."); + console.log('No new bad nodes found.'); } return badNodes; } - diff --git a/updates_and_signals/safe_message_handlers/src/client.ts b/updates_and_signals/safe_message_handlers/src/client.ts new file mode 100644 index 00000000..80f947bd --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/client.ts @@ -0,0 +1,45 @@ +import { Connection, Client, WorkflowHandle } from '@temporalio/client'; +import * as workflow from './workflows'; + +async function doClusterLifecycle(wf: WorkflowHandle, delaySeconds?: number): Promise { + await wf.signal(workflow.startClusterSignal); + + const allocationUpdates: Promise[] = []; + for (let i = 0; i < 6; i++) { + allocationUpdates.push( + wf.executeUpdate(workflow.allocateNodesToJobUpdate, { args: [{ numNodes: 2, jobName: `task-${i}` }] }) + ); + } + await Promise.all(allocationUpdates); + + if (delaySeconds) { + await new Promise((resolve) => setTimeout(resolve, delaySeconds * 1000)); + } + + const deletionUpdates: Promise[] = []; + for (let i = 0; i < 6; i++) { + deletionUpdates.push(wf.executeUpdate(workflow.deleteJobUpdate, { args: [{ jobName: `task-${i}` }] })); + } + await Promise.all(deletionUpdates); + + await wf.signal(workflow.shutdownClusterSignal); +} +async function main() { + const connection = await Connection.connect({ address: 'localhost:7233' }); + const client = new Client({ connection }); + + // Define the workflow handle + const wfHandle = await client.workflow.start(workflow.clusterManagerWorkflow, { + args: [{ testContinueAsNew: true }], + taskQueue: 'tq', + workflowId: 'cluster-management-workflow', + }); + + // Start the cluster lifecycle + await doClusterLifecycle(wfHandle); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/updates_and_signals/safe_message_handlers/src/starter.ts b/updates_and_signals/safe_message_handlers/src/starter.ts deleted file mode 100644 index 21a7b55e..00000000 --- a/updates_and_signals/safe_message_handlers/src/starter.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { WorkflowClient } from '@temporalio/client'; -import { ClusterManagerWorkflow } from './workflow'; -import { doClusterLifecycle } from './utils'; - -async function main() { - const client = new WorkflowClient(); - - // Define the workflow handle - const workflow = client.createWorkflowHandle(ClusterManagerWorkflow, { - workflowId: 'cluster-management-workflow', - }); - - // Start the cluster lifecycle - await doClusterLifecycle(workflow); -} - -main().catch((err) => { - console.error(err); - process.exit(1); -}); diff --git a/updates_and_signals/safe_message_handlers/src/worker.ts b/updates_and_signals/safe_message_handlers/src/worker.ts index dc3baa79..d82929b7 100644 --- a/updates_and_signals/safe_message_handlers/src/worker.ts +++ b/updates_and_signals/safe_message_handlers/src/worker.ts @@ -3,8 +3,7 @@ import path from 'path'; async function run() { const worker = await Worker.create({ - workflowsPath: path.join(__dirname, './workflows'), - activitiesPath: path.join(__dirname, './activities'), + workflowsPath: path.join(__dirname, './workflows.ts'), taskQueue: 'safe-message-handlers-task-queue', }); diff --git a/updates_and_signals/safe_message_handlers/src/workflow.ts b/updates_and_signals/safe_message_handlers/src/workflow.ts deleted file mode 100644 index ca984b7c..00000000 --- a/updates_and_signals/safe_message_handlers/src/workflow.ts +++ /dev/null @@ -1,102 +0,0 @@ -// This file contains the TypeScript port of the Python workflow for managing cluster updates and signals - -import { proxyActivities, defineSignal, defineQuery, setHandler, condition, sleep, defineWorkflow } from '@temporalio/workflow'; -import type { AllocateNodesToJobInput, DeallocateNodesForJobInput, FindBadNodesInput } from './interfaces'; - -// Define signals -const startClusterSignal = defineSignal('startCluster'); -const shutdownClusterSignal = defineSignal('shutdownCluster'); -const allocateNodesToJobSignal = defineSignal<[AllocateNodesToJobInput]>('allocateNodesToJob'); -const deallocateNodesForJobSignal = defineSignal<[DeallocateNodesForJobInput]>('deallocateNodesForJob'); - -// Define queries -const getClusterStatusQuery = defineQuery<{}>('getClusterStatus'); - -// Define activities -const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities<{ - allocateNodesToJob(input: AllocateNodesToJobInput): Promise; - deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise; - findBadNodes(input: FindBadNodesInput): Promise; -}>({ - startToCloseTimeout: '1 minute', -}); - -// Define workflow interface -export interface ClusterManagerWorkflow { - run(input: ClusterManagerWorkflowInput): Promise; -} - -// Define workflow input and result types -export interface ClusterManagerWorkflowInput { - testContinueAsNew: boolean; -} - -export interface ClusterManagerWorkflowResult { - maxAssignedNodes: number; - numCurrentlyAssignedNodes: number; - numBadNodes: number; -} - -// Workflow implementation -export const clusterManagerWorkflow: ClusterManagerWorkflow = defineWorkflow({ - async run(input: ClusterManagerWorkflowInput) { - let state = { - clusterStarted: false, - clusterShutdown: false, - nodes: {} as Record, - jobsAdded: new Set(), - maxAssignedNodes: 0, - }; - - // Signal handlers - setHandler(startClusterSignal, () => { - state.clusterStarted = true; - for (let i = 0; i < 25; i++) { - state.nodes[i.toString()] = null; - } - }); - - setHandler(shutdownClusterSignal, () => { - state.clusterShutdown = true; - }); - - setHandler(allocateNodesToJobSignal, async (input: AllocateNodesToJobInput) => { - if (!state.clusterStarted || state.clusterShutdown) { - throw new Error('Cluster is not in a valid state for node allocation'); - } - // Allocate nodes to job logic - }); - - setHandler(deallocateNodesForJobSignal, async (input: DeallocateNodesForJobInput) => { - if (!state.clusterStarted || state.clusterShutdown) { - throw new Error('Cluster is not in a valid state for node deallocation'); - } - // Deallocate nodes from job logic - }); - - // Query handler - setHandler(getClusterStatusQuery, () => { - return { - clusterStarted: state.clusterStarted, - clusterShutdown: state.clusterShutdown, - numNodes: Object.keys(state.nodes).length, - numAssignedNodes: Object.values(state.nodes).filter(n => n !== null).length, - }; - }); - - // Main workflow logic - await condition(() => state.clusterStarted, 'Waiting for cluster to start'); - // Perform operations while cluster is active - while (!state.clusterShutdown) { - // Example: perform periodic health checks - await sleep(60000); // Sleep for 60 seconds - } - - // Return workflow result - return { - maxAssignedNodes: state.maxAssignedNodes, - numCurrentlyAssignedNodes: Object.values(state.nodes).filter(n => n !== null).length, - numBadNodes: Object.values(state.nodes).filter(n => n === 'BAD').length, - }; - }, -}); diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts new file mode 100644 index 00000000..2c349c37 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -0,0 +1,124 @@ +// This file contains the TypeScript port of the Python workflow for managing cluster updates and signals + +import { + proxyActivities, + defineSignal, + defineUpdate, + defineQuery, + setHandler, + condition, + sleep, +} from '@temporalio/workflow'; +import type * as activities from './activities'; + +interface ClusterManagerState { + clusterStarted: boolean; + clusterShutdown: boolean; + nodes: { [key: string]: string | null }; + jobsAdded: Set; + maxAssignedNodes: number; +} + +interface ClusterManagerInput { + state?: ClusterManagerState; + testContinueAsNew: boolean; +} + +interface ClusterManagerResult { + maxAssignedNodes: number; + numCurrentlyAssignedNodes: number; + numBadNodes: number; +} + +export interface AllocateNodesToJobInput { + numNodes: number; + jobName: string; +} + +interface DeleteJobInput { + jobName: string; +} + +export interface ClusterManagerWorkflowInput { + testContinueAsNew: boolean; +} + +export interface ClusterManagerWorkflowResult { + maxAssignedNodes: number; + numCurrentlyAssignedNodes: number; + numBadNodes: number; +} + +// Message-handling API +export const startClusterSignal = defineSignal('startCluster'); +export const shutdownClusterSignal = defineSignal('shutdownCluster'); +export const allocateNodesToJobUpdate = defineUpdate('allocateNodesToJob'); +export const deleteJobUpdate = defineUpdate('deleteJob'); +const getClusterStatusQuery = defineQuery<{}>('getClusterStatus'); + +// Activities +const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities({ + startToCloseTimeout: '1 minute', +}); + +export async function clusterManagerWorkflow(input: ClusterManagerWorkflowInput) { + let state = { + clusterStarted: false, + clusterShutdown: false, + nodes: {} as Record, + jobsAdded: new Set(), + maxAssignedNodes: 0, + }; + + // Signal handlers + setHandler(startClusterSignal, () => { + state.clusterStarted = true; + for (let i = 0; i < 25; i++) { + state.nodes[i.toString()] = null; + } + }); + + setHandler(shutdownClusterSignal, () => { + state.clusterShutdown = true; + }); + + setHandler(allocateNodesToJobUpdate, async (input: AllocateNodesToJobInput): Promise => { + if (!state.clusterStarted || state.clusterShutdown) { + throw new Error('Cluster is not in a valid state for node allocation'); + } + // Allocate nodes to job logic + return []; + }); + + setHandler(deleteJobUpdate, async (input: DeleteJobInput) => { + if (!state.clusterStarted || state.clusterShutdown) { + throw new Error('Cluster is not in a valid state for node deallocation'); + } + // Deallocate nodes from job logic + }); + + // Query handler + setHandler(getClusterStatusQuery, () => { + return { + clusterStarted: state.clusterStarted, + clusterShutdown: state.clusterShutdown, + numNodes: Object.keys(state.nodes).length, + numAssignedNodes: Object.values(state.nodes).filter((n) => n !== null).length, + }; + }); + + // Main workflow logic + await condition(() => state.clusterStarted, 'Waiting for cluster to start'); + // Perform operations while cluster is active + while (!state.clusterShutdown) { + // Example: perform periodic health checks + await sleep(60000); // Sleep for 60 seconds + } + + // Return workflow result + return { + maxAssignedNodes: state.maxAssignedNodes, + numCurrentlyAssignedNodes: Object.values(state.nodes).filter((n) => n !== null).length, + numBadNodes: Object.values(state.nodes).filter((n) => n === 'BAD').length, + }; +} From 4a6b327272af10fefd0c6d2c43ae791d6f910da1 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 2 Jul 2024 04:21:46 -0400 Subject: [PATCH 05/21] Implement the sample for real --- .../safe_message_handlers/README.md | 2 +- .../safe_message_handlers/package.json | 9 +- .../safe_message_handlers/src/activities.ts | 23 ++- .../safe_message_handlers/src/client.ts | 45 +---- .../src/cluster-manager.ts | 155 +++++++++++++++ .../src/run-simulation.ts | 36 ++++ .../safe_message_handlers/src/test.ts | 65 +++++++ .../safe_message_handlers/src/types.ts | 31 +++ .../safe_message_handlers/src/worker.ts | 2 + .../src/workflow_test.ts | 28 --- .../safe_message_handlers/src/workflows.ts | 179 ++++++------------ 11 files changed, 380 insertions(+), 195 deletions(-) create mode 100644 updates_and_signals/safe_message_handlers/src/cluster-manager.ts create mode 100644 updates_and_signals/safe_message_handlers/src/run-simulation.ts create mode 100644 updates_and_signals/safe_message_handlers/src/test.ts create mode 100644 updates_and_signals/safe_message_handlers/src/types.ts delete mode 100644 updates_and_signals/safe_message_handlers/src/workflow_test.ts diff --git a/updates_and_signals/safe_message_handlers/README.md b/updates_and_signals/safe_message_handlers/README.md index 3b57f1e8..521f5f3d 100644 --- a/updates_and_signals/safe_message_handlers/README.md +++ b/updates_and_signals/safe_message_handlers/README.md @@ -3,4 +3,4 @@ 1. `npm install` to install dependencies. 1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation). 1. `npm run start.watch` to start the Worker. -1. In another shell, `npm run workflow` to run the Workflow Client. +1. In another shell, `npm run simulation` to run a simulation of the cluster manager. diff --git a/updates_and_signals/safe_message_handlers/package.json b/updates_and_signals/safe_message_handlers/package.json index b0568354..14efb0c1 100644 --- a/updates_and_signals/safe_message_handlers/package.json +++ b/updates_and_signals/safe_message_handlers/package.json @@ -8,9 +8,9 @@ "lint": "eslint .", "start": "ts-node src/worker.ts", "start.watch": "nodemon src/worker.ts", - "workflow": "ts-node src/client.ts", + "simulation": "ts-node src/run-simulation.ts", "format": "prettier --config .prettierrc 'src/**/*.ts' --write", - "test": "mocha --exit --require ts-node/register --require source-map-support/register src/mocha/*.test.ts" + "test": "ts-node src/test.ts" }, "nodemonConfig": { "execMap": { @@ -26,13 +26,16 @@ "@temporalio/client": "^1.9.0", "@temporalio/worker": "^1.9.0", "@temporalio/workflow": "^1.9.0", - "nanoid": "3.x" + "async-mutex": "^0.5.0", + "nanoid": "3.x", + "uuid": "^10.0.0" }, "devDependencies": { "@temporalio/testing": "^1.9.0", "@tsconfig/node16": "^1.0.0", "@types/mocha": "8.x", "@types/node": "^16.11.43", + "@types/uuid": "^10.0.0", "@typescript-eslint/eslint-plugin": "^5.0.0", "@typescript-eslint/parser": "^5.0.0", "eslint": "^7.32.0", diff --git a/updates_and_signals/safe_message_handlers/src/activities.ts b/updates_and_signals/safe_message_handlers/src/activities.ts index 204b6edf..6745102c 100644 --- a/updates_and_signals/safe_message_handlers/src/activities.ts +++ b/updates_and_signals/safe_message_handlers/src/activities.ts @@ -1,29 +1,30 @@ -export interface AllocateNodesToJobInput { +interface AssignNodesToJobInput { nodes: string[]; jobName: string; } -export interface DeallocateNodesForJobInput { +interface UnassignNodesForJobInput { nodes: string[]; jobName: string; } -export interface FindBadNodesInput { +interface FindBadNodesInput { nodesToCheck: string[]; } -export async function allocateNodesToJob(input: AllocateNodesToJobInput): Promise { +export async function assignNodesToJob(input: AssignNodesToJobInput): Promise { console.log(`Assigning nodes ${input.nodes} to job ${input.jobName}`); - await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async operation + await sleep(100); // Simulate RPC } -export async function deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise { - console.log(`Deallocating nodes ${input.nodes} from job ${input.jobName}`); - await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async operation +export async function unassignNodesForJob(input: UnassignNodesForJobInput): Promise { + console.log(`Unassigning nodes ${input.nodes} from job ${input.jobName}`); + await sleep(100); // Simulate RPC } export async function findBadNodes(input: FindBadNodesInput): Promise { - await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate async operation + console.log('Finding bad nodes'); + await sleep(100); // Simulate RPC const badNodes = input.nodesToCheck.filter((n) => parseInt(n) % 5 === 0); if (badNodes.length) { console.log(`Found bad nodes: ${badNodes}`); @@ -32,3 +33,7 @@ export async function findBadNodes(input: FindBadNodesInput): Promise } return badNodes; } + +async function sleep(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/updates_and_signals/safe_message_handlers/src/client.ts b/updates_and_signals/safe_message_handlers/src/client.ts index 80f947bd..d9a42e39 100644 --- a/updates_and_signals/safe_message_handlers/src/client.ts +++ b/updates_and_signals/safe_message_handlers/src/client.ts @@ -1,45 +1,14 @@ import { Connection, Client, WorkflowHandle } from '@temporalio/client'; -import * as workflow from './workflows'; +import { v4 as uuid } from 'uuid'; -async function doClusterLifecycle(wf: WorkflowHandle, delaySeconds?: number): Promise { - await wf.signal(workflow.startClusterSignal); +import { clusterManagerWorkflow } from './workflows'; - const allocationUpdates: Promise[] = []; - for (let i = 0; i < 6; i++) { - allocationUpdates.push( - wf.executeUpdate(workflow.allocateNodesToJobUpdate, { args: [{ numNodes: 2, jobName: `task-${i}` }] }) - ); - } - await Promise.all(allocationUpdates); - - if (delaySeconds) { - await new Promise((resolve) => setTimeout(resolve, delaySeconds * 1000)); - } - - const deletionUpdates: Promise[] = []; - for (let i = 0; i < 6; i++) { - deletionUpdates.push(wf.executeUpdate(workflow.deleteJobUpdate, { args: [{ jobName: `task-${i}` }] })); - } - await Promise.all(deletionUpdates); - - await wf.signal(workflow.shutdownClusterSignal); -} -async function main() { +export async function startClusterManager(): Promise> { const connection = await Connection.connect({ address: 'localhost:7233' }); const client = new Client({ connection }); - - // Define the workflow handle - const wfHandle = await client.workflow.start(workflow.clusterManagerWorkflow, { - args: [{ testContinueAsNew: true }], - taskQueue: 'tq', - workflowId: 'cluster-management-workflow', + return client.workflow.start(clusterManagerWorkflow, { + args: [{}], + taskQueue: 'safe-message-handlers-task-queue', + workflowId: `cluster-manager-${uuid()}`, }); - - // Start the cluster lifecycle - await doClusterLifecycle(wfHandle); } - -main().catch((err) => { - console.error(err); - process.exit(1); -}); diff --git a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts new file mode 100644 index 00000000..6f43a8cc --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts @@ -0,0 +1,155 @@ +import * as wf from '@temporalio/workflow'; +import type * as activities from './activities'; +import * as _3rdPartyAsyncMutexLibrary from 'async-mutex'; +import { + AssignNodesToJobUpdateInput, + ClusterManagerState, + ClusterManagerStateSummary, + DeleteJobUpdateInput, +} from './types'; + +const { assignNodesToJob, unassignNodesForJob } = wf.proxyActivities({ + startToCloseTimeout: '1 minute', +}); + +const { findBadNodes } = wf.proxyActivities({ + startToCloseTimeout: '1 minute', + retry: { + // This activity is called with the nodexMutex held. We do not retry, since retries would block + // cluster operations. + maximumAttempts: 1, + }, +}); + +// ClusterManagerWorkflow keeps track of the job assignments of a cluster of nodes. It exposes an +// API to started and shutdown the cluster, to assign jobs to nodes, and to delete jobs. The +// workflow maps this API to signals and updates. Operations altering node assignments must not +// interleave (must be serialized), and a standard (non-Temporal-specific) async mutex from a 3rd +// party library is used to ensure this. +export class ClusterManager { + state: ClusterManagerState; + jobsWithNodesAssigned: Set; + nodesMutex: _3rdPartyAsyncMutexLibrary.Mutex; + + constructor(state?: ClusterManagerState) { + this.state = state ?? { + clusterStarted: false, + clusterShutdown: false, + nodes: new Map(), + maxAssignedNodes: 0, + }; + this.jobsWithNodesAssigned = new Set(); + this.nodesMutex = new _3rdPartyAsyncMutexLibrary.Mutex(); + } + + startCluster(): void { + this.state.clusterStarted = true; + for (let i = 0; i < 25; i++) { + this.state.nodes.set(i.toString(), null); + } + wf.log.info('Cluster started'); + } + + async shutDownCluster(): Promise { + await wf.condition(() => this.state.clusterStarted); + this.state.clusterShutdown = true; + wf.log.info('Cluster shutdown'); + } + + async assignNodesToJob(input: AssignNodesToJobUpdateInput): Promise { + await wf.condition(() => this.state.clusterStarted); + if (this.state.clusterShutdown) { + // If you want the client to receive a failure, either add an update validator and throw the + // exception from there, or raise an ApplicationError. Other exceptions in the handler will + // cause the workflow to keep retrying and get it stuck. + throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is already shut down'); + } + return await this.nodesMutex.runExclusive(async (): Promise => { + // Idempotency guard: do nothing if the job already has nodes assigned. + if (!new Set(this.state.nodes.values()).has(input.jobName)) { + const unassignedNodes = this.getUnassignedNodes(); + if (input.numNodes > unassignedNodes.size) { + throw new wf.ApplicationFailure( + `Cannot assign ${input.numNodes} nodes; have only ${unassignedNodes.size} available` + ); + } + const nodesToAssign = Array.from(unassignedNodes).slice(0, input.numNodes); + // This await would be dangerous without the lock held because it would allow interleaving + // with the deleteJob and performHealthCheck operations, both of which mutate + // self.state.nodes. + await assignNodesToJob({ nodes: nodesToAssign, jobName: input.jobName }); + for (const node of nodesToAssign) { + this.state.nodes.set(node, input.jobName); + } + this.state.maxAssignedNodes = Math.max(this.state.maxAssignedNodes, this.getAssignedNodes().size); + } + return this.getStateSummary(); + }); + } + + async deleteJob(input: DeleteJobUpdateInput) { + await wf.condition(() => this.state.clusterStarted); + if (this.state.clusterShutdown) { + // If you want the client to receive a failure, either add an update validator and throw the + // exception from there, or raise an ApplicationError. Other exceptions in the handler will + // cause the workflow to keep retrying and get it stuck. + throw new wf.ApplicationFailure('Cannot delete job: Cluster is already shut down'); + } + await this.nodesMutex.runExclusive(async () => { + const nodesToUnassign = Array.from(this.state.nodes.entries()) + .filter(([_, v]) => v === input.jobName) + .map(([k, _]) => k); + // This await would be dangerous without the lock held because it would allow interleaving + // with the assignNodesToJob and performHealthCheck operations, both of which mutate + // self.state.nodes. + await unassignNodesForJob({ nodes: nodesToUnassign, jobName: input.jobName }); + for (const node of nodesToUnassign) { + this.state.nodes.set(node, null); + } + }); + } + + async performHealthChecks(): Promise { + wf.log.info('performHealthChecks'); + await this.nodesMutex.runExclusive(async () => { + const badNodes = await findBadNodes({ nodesToCheck: Array.from(this.getAssignedNodes()) }); + for (const node of badNodes) { + this.state.nodes.set(node, 'BAD!'); + } + }); + } + + getState(): ClusterManagerState { + return { + clusterStarted: this.state.clusterStarted, + clusterShutdown: this.state.clusterShutdown, + nodes: this.state.nodes, + maxAssignedNodes: this.state.maxAssignedNodes, + }; + } + + getStateSummary(): ClusterManagerStateSummary { + return { + maxAssignedNodes: this.state.maxAssignedNodes, + assignedNodes: this.getAssignedNodes().size, + badNodes: this.getBadNodes().size, + }; + } + + getUnassignedNodes(): Set { + return new Set(Array.from(this.state.nodes.keys()).filter((key) => this.state.nodes.get(key) === null)); + } + + getBadNodes(): Set { + return new Set(Array.from(this.state.nodes.keys()).filter((key) => this.state.nodes.get(key) === 'BAD!')); + } + + getAssignedNodes(jobName?: string): Set { + return new Set( + Array.from(this.state.nodes.keys()).filter((key) => { + const value = this.state.nodes.get(key); + return jobName ? value === jobName : value !== null && value !== 'BAD!'; + }) + ); + } +} diff --git a/updates_and_signals/safe_message_handlers/src/run-simulation.ts b/updates_and_signals/safe_message_handlers/src/run-simulation.ts new file mode 100644 index 00000000..60bd3d23 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/run-simulation.ts @@ -0,0 +1,36 @@ +import { WorkflowHandle } from '@temporalio/client'; + +import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterSignal } from './workflows'; +import { startClusterManager } from './client'; + +async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise { + await wf.signal(startClusterSignal); + + const allocationUpdates: Promise[] = []; + for (let i = 0; i < 6; i++) { + allocationUpdates.push(wf.executeUpdate(assignNodesToJobUpdate, { args: [{ numNodes: 2, jobName: `task-${i}` }] })); + } + await Promise.all(allocationUpdates); + + if (delaySeconds) { + await new Promise((resolve) => setTimeout(resolve, delaySeconds * 1000)); + } + + const deletionUpdates: Promise[] = []; + for (let i = 0; i < 6; i++) { + deletionUpdates.push(wf.executeUpdate(deleteJobUpdate, { args: [{ jobName: `task-${i}` }] })); + } + await Promise.all(deletionUpdates); + + await wf.signal(shutdownClusterSignal); +} + +async function main() { + const workflow = await startClusterManager(); + await runSimulation(workflow); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/updates_and_signals/safe_message_handlers/src/test.ts b/updates_and_signals/safe_message_handlers/src/test.ts new file mode 100644 index 00000000..593ffeaf --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/test.ts @@ -0,0 +1,65 @@ +import { + assignNodesToJobUpdate, + startClusterSignal, + shutdownClusterSignal, + deleteJobUpdate, + getClusterStatusQuery, +} from './workflows'; +import { startClusterManager } from './client'; +import assert from 'assert'; +import { ClusterManagerStateSummary } from './types'; + +async function testClusterManager() { + const workflow = await startClusterManager(); + await workflow.signal(startClusterSignal); + const request1 = { + numNodes: 5, + jobName: 'job1', + }; + + // Use an update to assign nodes. + const updateResult1 = await workflow.executeUpdate(assignNodesToJobUpdate, { + args: [request1], + }); + assert.equal(updateResult1.assignedNodes, request1.numNodes); + assert.equal(updateResult1.maxAssignedNodes, request1.numNodes); + + // Assign nodes to a job and then delete it + const request2 = { + numNodes: 6, + jobName: 'job2', + }; + const updateResult2 = await workflow.executeUpdate(assignNodesToJobUpdate, { + args: [request2], + }); + assert.equal(updateResult2.assignedNodes, request1.numNodes + request2.numNodes); + assert.equal(updateResult2.maxAssignedNodes, request1.numNodes + request2.numNodes); + + await workflow.executeUpdate(deleteJobUpdate, { args: [{ jobName: 'job2' }] }); + + // The delete doesn't return anything; use the query to get current cluster state + const queryResult = await workflow.query(getClusterStatusQuery); + assert.equal( + queryResult.assignedNodes, + request1.numNodes, + `expected ${request1.numNodes} left after deleting ${request2.numNodes}` + ); + assert.equal(queryResult.maxAssignedNodes, request1.numNodes + request2.numNodes); + + // Terminate the workflow and check that workflow returns same value as obtained from last query. + await workflow.signal(shutdownClusterSignal); + const wfResult = await workflow.result(); + assert.deepEqual(wfResult, queryResult); +} + +async function runTests() { + for (const fn of [testClusterManager]) { + console.log(fn.name); + await fn(); + } +} + +runTests().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/updates_and_signals/safe_message_handlers/src/types.ts b/updates_and_signals/safe_message_handlers/src/types.ts new file mode 100644 index 00000000..3493b50b --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/types.ts @@ -0,0 +1,31 @@ +export interface ClusterManagerState { + clusterStarted: boolean; + clusterShutdown: boolean; + nodes: Map; + maxAssignedNodes: number; +} + +export interface ClusterManagerInput { + state?: ClusterManagerState; +} + +export interface ClusterManagerStateSummary { + maxAssignedNodes: number; + assignedNodes: number; + badNodes: number; +} + +export interface AssignNodesToJobUpdateInput { + numNodes: number; + jobName: string; +} + +export interface DeleteJobUpdateInput { + jobName: string; +} + +export interface ClusterManagerWorkflowResult { + maxAssignedNodes: number; + numCurrentlyAssignedNodes: number; + numBadNodes: number; +} diff --git a/updates_and_signals/safe_message_handlers/src/worker.ts b/updates_and_signals/safe_message_handlers/src/worker.ts index d82929b7..e65d1e56 100644 --- a/updates_and_signals/safe_message_handlers/src/worker.ts +++ b/updates_and_signals/safe_message_handlers/src/worker.ts @@ -1,9 +1,11 @@ import { Worker } from '@temporalio/worker'; import path from 'path'; +import * as activities from './activities'; async function run() { const worker = await Worker.create({ workflowsPath: path.join(__dirname, './workflows.ts'), + activities, taskQueue: 'safe-message-handlers-task-queue', }); diff --git a/updates_and_signals/safe_message_handlers/src/workflow_test.ts b/updates_and_signals/safe_message_handlers/src/workflow_test.ts deleted file mode 100644 index 6f95198e..00000000 --- a/updates_and_signals/safe_message_handlers/src/workflow_test.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { WorkflowClient } from '@temporalio/client'; -import { ClusterManagerWorkflow } from './workflow'; -import { v4 as uuidv4 } from 'uuid'; - -async function run() { - const client = new WorkflowClient(); - - // Define the workflow handle - const workflow = client.createWorkflowHandle(ClusterManagerWorkflow, { - workflowId: `cluster-management-workflow-${uuidv4()}`, - }); - - // Test workflow functionality - await workflow.start(); - await workflow.signal.startCluster(); - await workflow.executeUpdate('allocateNodesToJob', { - numNodes: 5, - jobName: 'job1', - }); - await workflow.signal.shutdownCluster(); - const result = await workflow.result(); - console.log('Workflow result:', result); -} - -run().catch((err) => { - console.error(err); - process.exit(1); -}); diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts index 2c349c37..273083a8 100644 --- a/updates_and_signals/safe_message_handlers/src/workflows.ts +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -1,124 +1,71 @@ -// This file contains the TypeScript port of the Python workflow for managing cluster updates and signals - +import * as wf from '@temporalio/workflow'; +import * as _3rdPartyAsyncMutexLibrary from 'async-mutex'; +import { ClusterManager } from './cluster-manager'; import { - proxyActivities, - defineSignal, - defineUpdate, - defineQuery, - setHandler, - condition, - sleep, -} from '@temporalio/workflow'; -import type * as activities from './activities'; - -interface ClusterManagerState { - clusterStarted: boolean; - clusterShutdown: boolean; - nodes: { [key: string]: string | null }; - jobsAdded: Set; - maxAssignedNodes: number; -} - -interface ClusterManagerInput { - state?: ClusterManagerState; - testContinueAsNew: boolean; -} - -interface ClusterManagerResult { - maxAssignedNodes: number; - numCurrentlyAssignedNodes: number; - numBadNodes: number; -} - -export interface AllocateNodesToJobInput { - numNodes: number; - jobName: string; -} - -interface DeleteJobInput { - jobName: string; -} - -export interface ClusterManagerWorkflowInput { - testContinueAsNew: boolean; -} - -export interface ClusterManagerWorkflowResult { - maxAssignedNodes: number; - numCurrentlyAssignedNodes: number; - numBadNodes: number; -} - -// Message-handling API -export const startClusterSignal = defineSignal('startCluster'); -export const shutdownClusterSignal = defineSignal('shutdownCluster'); -export const allocateNodesToJobUpdate = defineUpdate('allocateNodesToJob'); -export const deleteJobUpdate = defineUpdate('deleteJob'); -const getClusterStatusQuery = defineQuery<{}>('getClusterStatus'); - -// Activities -const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities({ - startToCloseTimeout: '1 minute', -}); - -export async function clusterManagerWorkflow(input: ClusterManagerWorkflowInput) { - let state = { - clusterStarted: false, - clusterShutdown: false, - nodes: {} as Record, - jobsAdded: new Set(), - maxAssignedNodes: 0, - }; - - // Signal handlers - setHandler(startClusterSignal, () => { - state.clusterStarted = true; - for (let i = 0; i < 25; i++) { - state.nodes[i.toString()] = null; - } + AssignNodesToJobUpdateInput, + ClusterManagerInput, + ClusterManagerStateSummary, + DeleteJobUpdateInput, +} from './types'; + +export const startClusterSignal = wf.defineSignal('startCluster'); +export const shutdownClusterSignal = wf.defineSignal('shutdownCluster'); +export const assignNodesToJobUpdate = wf.defineUpdate( + 'allocateNodesToJob' +); +export const deleteJobUpdate = wf.defineUpdate('deleteJob'); +export const getClusterStatusQuery = wf.defineQuery('getClusterStatus'); + +export async function clusterManagerWorkflow(input: ClusterManagerInput): Promise { + const manager = new ClusterManager(input.state); + // + // Message-handling API + // + // We do not use `bind()` since it loses the function type information. + wf.setHandler(startClusterSignal, (...args) => manager.startCluster(...args)); + wf.setHandler(shutdownClusterSignal, (...args) => manager.shutDownCluster(...args)); + + // This is an update as opposed to a signal because the client may want to wait for nodes to be + // allocated before sending work to those nodes. Returns the array of node names that were + // allocated to the job. + wf.setHandler(assignNodesToJobUpdate, (...args) => manager.assignNodesToJob(...args), { + validator: async (input: AssignNodesToJobUpdateInput): Promise => { + if (input.numNodes <= 0) { + throw new Error(`numNodes must be positive (got ${input.numNodes})`); + } + }, }); - setHandler(shutdownClusterSignal, () => { - state.clusterShutdown = true; - }); + // Even though it returns nothing, this is an update because the client may want to track it, for + // example to wait for nodes to be unassigned before reassigning them. + wf.setHandler(deleteJobUpdate, (...args) => manager.deleteJob(...args)); + wf.setHandler(getClusterStatusQuery, (...args) => manager.getStateSummary(...args)); - setHandler(allocateNodesToJobUpdate, async (input: AllocateNodesToJobInput): Promise => { - if (!state.clusterStarted || state.clusterShutdown) { - throw new Error('Cluster is not in a valid state for node allocation'); + // + // Main workflow logic + // + // The cluster manager workflow is a long-running workflow ("entity" workflow). Most of its logic + // lies in the message-processing handlers implented in the ClusterManager class. The main + // workflow itself is a loop that does the following: + // - process messages + // - perform health check at regular intervals + // - continue-as-new when suggested + // + const healthCheckIntervalSeconds = 10; + + await wf.condition(() => manager.state.clusterStarted); + for (;;) { + await manager.performHealthChecks(); + await wf.condition( + () => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested, + healthCheckIntervalSeconds * 1000 + ); + if (manager.state.clusterShutdown) { + break; } - // Allocate nodes to job logic - return []; - }); - - setHandler(deleteJobUpdate, async (input: DeleteJobInput) => { - if (!state.clusterStarted || state.clusterShutdown) { - throw new Error('Cluster is not in a valid state for node deallocation'); + if (wf.workflowInfo().continueAsNewSuggested) { + await wf.continueAsNew({ state: manager.getState() }); } - // Deallocate nodes from job logic - }); - - // Query handler - setHandler(getClusterStatusQuery, () => { - return { - clusterStarted: state.clusterStarted, - clusterShutdown: state.clusterShutdown, - numNodes: Object.keys(state.nodes).length, - numAssignedNodes: Object.values(state.nodes).filter((n) => n !== null).length, - }; - }); - - // Main workflow logic - await condition(() => state.clusterStarted, 'Waiting for cluster to start'); - // Perform operations while cluster is active - while (!state.clusterShutdown) { - // Example: perform periodic health checks - await sleep(60000); // Sleep for 60 seconds } - - // Return workflow result - return { - maxAssignedNodes: state.maxAssignedNodes, - numCurrentlyAssignedNodes: Object.values(state.nodes).filter((n) => n !== null).length, - numBadNodes: Object.values(state.nodes).filter((n) => n === 'BAD').length, - }; + return manager.getStateSummary(); } From 50ef7a708c06bef60592d901648ef736300024f0 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 3 Jul 2024 14:12:19 -0400 Subject: [PATCH 06/21] Move healthcheck into stubbed activity We want to avoid encouraging users to poll from an entity workflow loop, even if the interval is long in the example. --- .../safe_message_handlers/src/activities.ts | 19 +++++----------- .../src/cluster-manager.ts | 19 +++++----------- .../safe_message_handlers/src/test.ts | 1 - .../safe_message_handlers/src/workflows.ts | 22 +++++++------------ 4 files changed, 19 insertions(+), 42 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/src/activities.ts b/updates_and_signals/safe_message_handlers/src/activities.ts index 6745102c..8016534b 100644 --- a/updates_and_signals/safe_message_handlers/src/activities.ts +++ b/updates_and_signals/safe_message_handlers/src/activities.ts @@ -8,10 +8,6 @@ interface UnassignNodesForJobInput { jobName: string; } -interface FindBadNodesInput { - nodesToCheck: string[]; -} - export async function assignNodesToJob(input: AssignNodesToJobInput): Promise { console.log(`Assigning nodes ${input.nodes} to job ${input.jobName}`); await sleep(100); // Simulate RPC @@ -22,16 +18,13 @@ export async function unassignNodesForJob(input: UnassignNodesForJobInput): Prom await sleep(100); // Simulate RPC } -export async function findBadNodes(input: FindBadNodesInput): Promise { - console.log('Finding bad nodes'); - await sleep(100); // Simulate RPC - const badNodes = input.nodesToCheck.filter((n) => parseInt(n) % 5 === 0); - if (badNodes.length) { - console.log(`Found bad nodes: ${badNodes}`); - } else { - console.log('No new bad nodes found.'); +export async function performHealthChecks(): Promise { + const healthCheckInterval = 10 * 1000; + for (;;) { + console.log(`performing health check`); + await sleep(100); // Simulate RPC + await sleep(healthCheckInterval); } - return badNodes; } async function sleep(ms: number): Promise { diff --git a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts index 6f43a8cc..526d2520 100644 --- a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts +++ b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts @@ -9,16 +9,7 @@ import { } from './types'; const { assignNodesToJob, unassignNodesForJob } = wf.proxyActivities({ - startToCloseTimeout: '1 minute', -}); - -const { findBadNodes } = wf.proxyActivities({ - startToCloseTimeout: '1 minute', - retry: { - // This activity is called with the nodexMutex held. We do not retry, since retries would block - // cluster operations. - maximumAttempts: 1, - }, + startToCloseTimeout: '1 minute', // TODO }); // ClusterManagerWorkflow keeps track of the job assignments of a cluster of nodes. It exposes an @@ -42,11 +33,12 @@ export class ClusterManager { this.nodesMutex = new _3rdPartyAsyncMutexLibrary.Mutex(); } - startCluster(): void { + async startCluster(): Promise { this.state.clusterStarted = true; for (let i = 0; i < 25; i++) { this.state.nodes.set(i.toString(), null); } + wf.scheduleActivity('performHealthChecks', [], { scheduleToCloseTimeout: 24 * 60 * 60 * 1000 }); wf.log.info('Cluster started'); } @@ -109,10 +101,9 @@ export class ClusterManager { }); } - async performHealthChecks(): Promise { - wf.log.info('performHealthChecks'); + async notifyBadNodes(badNodes: string[]): Promise { + wf.log.info('handleBadNodesNotification'); await this.nodesMutex.runExclusive(async () => { - const badNodes = await findBadNodes({ nodesToCheck: Array.from(this.getAssignedNodes()) }); for (const node of badNodes) { this.state.nodes.set(node, 'BAD!'); } diff --git a/updates_and_signals/safe_message_handlers/src/test.ts b/updates_and_signals/safe_message_handlers/src/test.ts index 593ffeaf..07842df8 100644 --- a/updates_and_signals/safe_message_handlers/src/test.ts +++ b/updates_and_signals/safe_message_handlers/src/test.ts @@ -7,7 +7,6 @@ import { } from './workflows'; import { startClusterManager } from './client'; import assert from 'assert'; -import { ClusterManagerStateSummary } from './types'; async function testClusterManager() { const workflow = await startClusterManager(); diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts index 273083a8..6e40b581 100644 --- a/updates_and_signals/safe_message_handlers/src/workflows.ts +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -14,6 +14,7 @@ export const assignNodesToJobUpdate = wf.defineUpdate('deleteJob'); +export const notifyBadNodesSignal = wf.defineSignal<[string[]]>('notifyBadNodes'); export const getClusterStatusQuery = wf.defineQuery('getClusterStatus'); export async function clusterManagerWorkflow(input: ClusterManagerInput): Promise { @@ -21,14 +22,13 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis // // Message-handling API // - // We do not use `bind()` since it loses the function type information. - wf.setHandler(startClusterSignal, (...args) => manager.startCluster(...args)); - wf.setHandler(shutdownClusterSignal, (...args) => manager.shutDownCluster(...args)); + wf.setHandler(startClusterSignal, () => manager.startCluster()); + wf.setHandler(shutdownClusterSignal, () => manager.shutDownCluster()); // This is an update as opposed to a signal because the client may want to wait for nodes to be // allocated before sending work to those nodes. Returns the array of node names that were // allocated to the job. - wf.setHandler(assignNodesToJobUpdate, (...args) => manager.assignNodesToJob(...args), { + wf.setHandler(assignNodesToJobUpdate, (input) => manager.assignNodesToJob(input), { validator: async (input: AssignNodesToJobUpdateInput): Promise => { if (input.numNodes <= 0) { throw new Error(`numNodes must be positive (got ${input.numNodes})`); @@ -38,8 +38,9 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis // Even though it returns nothing, this is an update because the client may want to track it, for // example to wait for nodes to be unassigned before reassigning them. - wf.setHandler(deleteJobUpdate, (...args) => manager.deleteJob(...args)); - wf.setHandler(getClusterStatusQuery, (...args) => manager.getStateSummary(...args)); + wf.setHandler(deleteJobUpdate, (input) => manager.deleteJob(input)); + wf.setHandler(notifyBadNodesSignal, (input) => manager.notifyBadNodes(input)); + wf.setHandler(getClusterStatusQuery, () => manager.getStateSummary()); // // Main workflow logic @@ -48,18 +49,11 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis // lies in the message-processing handlers implented in the ClusterManager class. The main // workflow itself is a loop that does the following: // - process messages - // - perform health check at regular intervals // - continue-as-new when suggested // - const healthCheckIntervalSeconds = 10; - await wf.condition(() => manager.state.clusterStarted); for (;;) { - await manager.performHealthChecks(); - await wf.condition( - () => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested, - healthCheckIntervalSeconds * 1000 - ); + await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested); if (manager.state.clusterShutdown) { break; } From 51685ddc4bd704f490ecc9c888f6ea09524d7cb3 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 5 Jul 2024 03:24:47 -0400 Subject: [PATCH 07/21] Delete health check pending implementation as long-running activity --- .../safe_message_handlers/src/activities.ts | 9 --------- .../safe_message_handlers/src/cluster-manager.ts | 15 ++------------- .../safe_message_handlers/src/workflows.ts | 2 -- 3 files changed, 2 insertions(+), 24 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/src/activities.ts b/updates_and_signals/safe_message_handlers/src/activities.ts index 8016534b..4efe4fd9 100644 --- a/updates_and_signals/safe_message_handlers/src/activities.ts +++ b/updates_and_signals/safe_message_handlers/src/activities.ts @@ -18,15 +18,6 @@ export async function unassignNodesForJob(input: UnassignNodesForJobInput): Prom await sleep(100); // Simulate RPC } -export async function performHealthChecks(): Promise { - const healthCheckInterval = 10 * 1000; - for (;;) { - console.log(`performing health check`); - await sleep(100); // Simulate RPC - await sleep(healthCheckInterval); - } -} - async function sleep(ms: number): Promise { await new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts index 526d2520..b16e3595 100644 --- a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts +++ b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts @@ -33,12 +33,11 @@ export class ClusterManager { this.nodesMutex = new _3rdPartyAsyncMutexLibrary.Mutex(); } - async startCluster(): Promise { + startCluster(): void { this.state.clusterStarted = true; for (let i = 0; i < 25; i++) { this.state.nodes.set(i.toString(), null); } - wf.scheduleActivity('performHealthChecks', [], { scheduleToCloseTimeout: 24 * 60 * 60 * 1000 }); wf.log.info('Cluster started'); } @@ -67,8 +66,7 @@ export class ClusterManager { } const nodesToAssign = Array.from(unassignedNodes).slice(0, input.numNodes); // This await would be dangerous without the lock held because it would allow interleaving - // with the deleteJob and performHealthCheck operations, both of which mutate - // self.state.nodes. + // with the deleteJob operation, which mutates self.state.nodes. await assignNodesToJob({ nodes: nodesToAssign, jobName: input.jobName }); for (const node of nodesToAssign) { this.state.nodes.set(node, input.jobName); @@ -101,15 +99,6 @@ export class ClusterManager { }); } - async notifyBadNodes(badNodes: string[]): Promise { - wf.log.info('handleBadNodesNotification'); - await this.nodesMutex.runExclusive(async () => { - for (const node of badNodes) { - this.state.nodes.set(node, 'BAD!'); - } - }); - } - getState(): ClusterManagerState { return { clusterStarted: this.state.clusterStarted, diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts index 6e40b581..42afbcec 100644 --- a/updates_and_signals/safe_message_handlers/src/workflows.ts +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -14,7 +14,6 @@ export const assignNodesToJobUpdate = wf.defineUpdate('deleteJob'); -export const notifyBadNodesSignal = wf.defineSignal<[string[]]>('notifyBadNodes'); export const getClusterStatusQuery = wf.defineQuery('getClusterStatus'); export async function clusterManagerWorkflow(input: ClusterManagerInput): Promise { @@ -39,7 +38,6 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis // Even though it returns nothing, this is an update because the client may want to track it, for // example to wait for nodes to be unassigned before reassigning them. wf.setHandler(deleteJobUpdate, (input) => manager.deleteJob(input)); - wf.setHandler(notifyBadNodesSignal, (input) => manager.notifyBadNodes(input)); wf.setHandler(getClusterStatusQuery, () => manager.getStateSummary()); // From ecbba8c18cf7f28541e25ed38fe3a3948508a48e Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 5 Jul 2024 03:55:52 -0400 Subject: [PATCH 08/21] Edit comment --- .../safe_message_handlers/src/cluster-manager.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts index b16e3595..8455f69d 100644 --- a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts +++ b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts @@ -13,10 +13,13 @@ const { assignNodesToJob, unassignNodesForJob } = wf.proxyActivities; From 941ed36347a3d7ac1aea0a9775717af345fc9f6d Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 5 Jul 2024 03:57:07 -0400 Subject: [PATCH 09/21] Add startCluster RPC --- updates_and_signals/safe_message_handlers/src/activities.ts | 5 +++++ .../safe_message_handlers/src/cluster-manager.ts | 5 +++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/src/activities.ts b/updates_and_signals/safe_message_handlers/src/activities.ts index 4efe4fd9..f5c8e98f 100644 --- a/updates_and_signals/safe_message_handlers/src/activities.ts +++ b/updates_and_signals/safe_message_handlers/src/activities.ts @@ -8,6 +8,11 @@ interface UnassignNodesForJobInput { jobName: string; } +export async function startCluster(): Promise { + console.log('Starting cluster'); + await sleep(100); // Simulate RPC +} + export async function assignNodesToJob(input: AssignNodesToJobInput): Promise { console.log(`Assigning nodes ${input.nodes} to job ${input.jobName}`); await sleep(100); // Simulate RPC diff --git a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts index 8455f69d..a4a01222 100644 --- a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts +++ b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts @@ -8,7 +8,7 @@ import { DeleteJobUpdateInput, } from './types'; -const { assignNodesToJob, unassignNodesForJob } = wf.proxyActivities({ +const { assignNodesToJob, unassignNodesForJob, startCluster } = wf.proxyActivities({ startToCloseTimeout: '1 minute', // TODO }); @@ -36,7 +36,8 @@ export class ClusterManager { this.nodesMutex = new _3rdPartyAsyncMutexLibrary.Mutex(); } - startCluster(): void { + async startCluster(): Promise { + await startCluster(); this.state.clusterStarted = true; for (let i = 0; i < 25; i++) { this.state.nodes.set(i.toString(), null); From 396887ffaf6384f5a6973c55c92118abc27bc2e6 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 12 Jul 2024 16:47:14 -0400 Subject: [PATCH 10/21] Don't use strange import style --- .../safe_message_handlers/src/cluster-manager.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts index a4a01222..e4446287 100644 --- a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts +++ b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts @@ -1,6 +1,6 @@ import * as wf from '@temporalio/workflow'; import type * as activities from './activities'; -import * as _3rdPartyAsyncMutexLibrary from 'async-mutex'; +import { Mutex } from 'async-mutex'; import { AssignNodesToJobUpdateInput, ClusterManagerState, @@ -23,7 +23,7 @@ const { assignNodesToJob, unassignNodesForJob, startCluster } = wf.proxyActiviti export class ClusterManager { state: ClusterManagerState; jobsWithNodesAssigned: Set; - nodesMutex: _3rdPartyAsyncMutexLibrary.Mutex; + nodesMutex: Mutex; constructor(state?: ClusterManagerState) { this.state = state ?? { @@ -33,7 +33,7 @@ export class ClusterManager { maxAssignedNodes: 0, }; this.jobsWithNodesAssigned = new Set(); - this.nodesMutex = new _3rdPartyAsyncMutexLibrary.Mutex(); + this.nodesMutex = new Mutex(); } async startCluster(): Promise { From d391361b92ca84d88abf5ca1fb3d3d89c9953bd6 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 12 Jul 2024 17:10:32 -0400 Subject: [PATCH 11/21] test --- .../safe_message_handlers/package.json | 2 +- .../safe_message_handlers/src/test.ts | 64 ------------- .../src/test/workflows.test.ts | 90 +++++++++++++++++++ 3 files changed, 91 insertions(+), 65 deletions(-) delete mode 100644 updates_and_signals/safe_message_handlers/src/test.ts create mode 100644 updates_and_signals/safe_message_handlers/src/test/workflows.test.ts diff --git a/updates_and_signals/safe_message_handlers/package.json b/updates_and_signals/safe_message_handlers/package.json index 14efb0c1..5b320346 100644 --- a/updates_and_signals/safe_message_handlers/package.json +++ b/updates_and_signals/safe_message_handlers/package.json @@ -10,7 +10,7 @@ "start.watch": "nodemon src/worker.ts", "simulation": "ts-node src/run-simulation.ts", "format": "prettier --config .prettierrc 'src/**/*.ts' --write", - "test": "ts-node src/test.ts" + "test": "mocha --exit --require ts-node/register src/test/*.test.ts" }, "nodemonConfig": { "execMap": { diff --git a/updates_and_signals/safe_message_handlers/src/test.ts b/updates_and_signals/safe_message_handlers/src/test.ts deleted file mode 100644 index 07842df8..00000000 --- a/updates_and_signals/safe_message_handlers/src/test.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { - assignNodesToJobUpdate, - startClusterSignal, - shutdownClusterSignal, - deleteJobUpdate, - getClusterStatusQuery, -} from './workflows'; -import { startClusterManager } from './client'; -import assert from 'assert'; - -async function testClusterManager() { - const workflow = await startClusterManager(); - await workflow.signal(startClusterSignal); - const request1 = { - numNodes: 5, - jobName: 'job1', - }; - - // Use an update to assign nodes. - const updateResult1 = await workflow.executeUpdate(assignNodesToJobUpdate, { - args: [request1], - }); - assert.equal(updateResult1.assignedNodes, request1.numNodes); - assert.equal(updateResult1.maxAssignedNodes, request1.numNodes); - - // Assign nodes to a job and then delete it - const request2 = { - numNodes: 6, - jobName: 'job2', - }; - const updateResult2 = await workflow.executeUpdate(assignNodesToJobUpdate, { - args: [request2], - }); - assert.equal(updateResult2.assignedNodes, request1.numNodes + request2.numNodes); - assert.equal(updateResult2.maxAssignedNodes, request1.numNodes + request2.numNodes); - - await workflow.executeUpdate(deleteJobUpdate, { args: [{ jobName: 'job2' }] }); - - // The delete doesn't return anything; use the query to get current cluster state - const queryResult = await workflow.query(getClusterStatusQuery); - assert.equal( - queryResult.assignedNodes, - request1.numNodes, - `expected ${request1.numNodes} left after deleting ${request2.numNodes}` - ); - assert.equal(queryResult.maxAssignedNodes, request1.numNodes + request2.numNodes); - - // Terminate the workflow and check that workflow returns same value as obtained from last query. - await workflow.signal(shutdownClusterSignal); - const wfResult = await workflow.result(); - assert.deepEqual(wfResult, queryResult); -} - -async function runTests() { - for (const fn of [testClusterManager]) { - console.log(fn.name); - await fn(); - } -} - -runTests().catch((err) => { - console.error(err); - process.exit(1); -}); diff --git a/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts b/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts new file mode 100644 index 00000000..057fdf1e --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts @@ -0,0 +1,90 @@ +import { TestWorkflowEnvironment } from '@temporalio/testing'; +import { before, describe, it } from 'mocha'; +import { bundleWorkflowCode, WorkflowBundleWithSourceMap, DefaultLogger, Runtime, Worker } from '@temporalio/worker'; +import * as activities from '../activities'; +import { + clusterManagerWorkflow, + assignNodesToJobUpdate, + startClusterSignal, + shutdownClusterSignal, + deleteJobUpdate, + getClusterStatusQuery, +} from '../workflows'; +import { nanoid } from 'nanoid'; +import assert from 'assert'; + +const taskQueue = 'test' + new Date().toLocaleDateString('en-US'); + +describe('cluster manager', function () { + this.timeout(10000); + let worker: Worker; + let env: TestWorkflowEnvironment; + let workflowBundle: WorkflowBundleWithSourceMap; + + before(async function () { + Runtime.install({ logger: new DefaultLogger('WARN') }); + env = await TestWorkflowEnvironment.createLocal(); + + workflowBundle = await bundleWorkflowCode({ + workflowsPath: require.resolve('../workflows'), + logger: new DefaultLogger('WARN'), + }); + }); + + beforeEach(async function () { + worker = await Worker.create({ + connection: env.nativeConnection, + workflowBundle, + activities, + taskQueue, + }); + }); + + after(async function () { + await env.teardown(); + }); + + it('successfully completes a session', async function () { + await worker.runUntil(async function () { + const workflow = await env.client.workflow.start(clusterManagerWorkflow, { + args: [{}], + taskQueue, + workflowId: 'cluster-manager-' + nanoid(), + }); + await workflow.signal(startClusterSignal); + const request1 = { + numNodes: 5, + jobName: 'job1', + }; + // Use an update to assign nodes. + const updateResult1 = await workflow.executeUpdate(assignNodesToJobUpdate, { + args: [request1], + }); + assert.equal(updateResult1.assignedNodes, request1.numNodes); + assert.equal(updateResult1.maxAssignedNodes, request1.numNodes); + // Assign nodes to a job and then delete it + const request2 = { + numNodes: 6, + jobName: 'job2', + }; + const updateResult2 = await workflow.executeUpdate(assignNodesToJobUpdate, { + args: [request2], + }); + assert.equal(updateResult2.assignedNodes, request1.numNodes + request2.numNodes); + assert.equal(updateResult2.maxAssignedNodes, request1.numNodes + request2.numNodes); + await workflow.executeUpdate(deleteJobUpdate, { args: [{ jobName: 'job2' }] }); + // The delete doesn't return anything; use the query to get current cluster state + const queryResult = await workflow.query(getClusterStatusQuery); + assert.equal( + queryResult.assignedNodes, + request1.numNodes, + `expected ${request1.numNodes} left after deleting ${request2.numNodes}` + ); + assert.equal(queryResult.maxAssignedNodes, request1.numNodes + request2.numNodes); + // Terminate the workflow and check that workflow returns same value as obtained from last query. + await workflow.signal(shutdownClusterSignal); + const wfResult = await workflow.result(); + assert.deepEqual(wfResult, queryResult); + }); + }); +}); From 547c2f2a9888e45edf95a144622b22547da27db0 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 12 Jul 2024 18:14:42 -0400 Subject: [PATCH 12/21] Use activities.{log,info} --- .../safe_message_handlers/src/activities.ts | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/src/activities.ts b/updates_and_signals/safe_message_handlers/src/activities.ts index f5c8e98f..1e2399f0 100644 --- a/updates_and_signals/safe_message_handlers/src/activities.ts +++ b/updates_and_signals/safe_message_handlers/src/activities.ts @@ -1,3 +1,5 @@ +import * as activities from '@temporalio/activity'; + interface AssignNodesToJobInput { nodes: string[]; jobName: string; @@ -9,20 +11,16 @@ interface UnassignNodesForJobInput { } export async function startCluster(): Promise { - console.log('Starting cluster'); - await sleep(100); // Simulate RPC + activities.log.info('Starting cluster'); + await activities.sleep(100); // Simulate RPC } export async function assignNodesToJob(input: AssignNodesToJobInput): Promise { - console.log(`Assigning nodes ${input.nodes} to job ${input.jobName}`); - await sleep(100); // Simulate RPC + activities.log.info(`Assigning nodes ${input.nodes} to job ${input.jobName}`); + await activities.sleep(100); // Simulate RPC } export async function unassignNodesForJob(input: UnassignNodesForJobInput): Promise { - console.log(`Unassigning nodes ${input.nodes} from job ${input.jobName}`); - await sleep(100); // Simulate RPC -} - -async function sleep(ms: number): Promise { - await new Promise((resolve) => setTimeout(resolve, ms)); + activities.log.info(`Unassigning nodes ${input.nodes} from job ${input.jobName}`); + await activities.sleep(100); // Simulate RPC } From 8370804e5ca42fc7ee796f331c9817fdb62ac80d Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 12 Jul 2024 18:19:19 -0400 Subject: [PATCH 13/21] Use setTimeout from 'timers/promises' --- .../safe_message_handlers/src/run-simulation.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/updates_and_signals/safe_message_handlers/src/run-simulation.ts b/updates_and_signals/safe_message_handlers/src/run-simulation.ts index 60bd3d23..9096c888 100644 --- a/updates_and_signals/safe_message_handlers/src/run-simulation.ts +++ b/updates_and_signals/safe_message_handlers/src/run-simulation.ts @@ -2,6 +2,7 @@ import { WorkflowHandle } from '@temporalio/client'; import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterSignal } from './workflows'; import { startClusterManager } from './client'; +import { setTimeout } from 'timers/promises'; async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise { await wf.signal(startClusterSignal); @@ -13,7 +14,7 @@ async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise await Promise.all(allocationUpdates); if (delaySeconds) { - await new Promise((resolve) => setTimeout(resolve, delaySeconds * 1000)); + await setTimeout(delaySeconds * 1000); } const deletionUpdates: Promise[] = []; From 9657f0a96d06acc2a8b9ce4ac356570c47a7f4ec Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 12 Jul 2024 18:24:04 -0400 Subject: [PATCH 14/21] Remove unused import --- updates_and_signals/safe_message_handlers/src/workflows.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts index 42afbcec..73a95d2f 100644 --- a/updates_and_signals/safe_message_handlers/src/workflows.ts +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -1,5 +1,4 @@ import * as wf from '@temporalio/workflow'; -import * as _3rdPartyAsyncMutexLibrary from 'async-mutex'; import { ClusterManager } from './cluster-manager'; import { AssignNodesToJobUpdateInput, From b301b96b1e1279c64419e7d7b5e4c57cd85994f8 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 24 Jul 2024 09:47:18 -0400 Subject: [PATCH 15/21] Delete remaining "bad nodes" concept --- .../safe_message_handlers/src/cluster-manager.ts | 5 ----- updates_and_signals/safe_message_handlers/src/types.ts | 1 - 2 files changed, 6 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts index e4446287..8c4c6b29 100644 --- a/updates_and_signals/safe_message_handlers/src/cluster-manager.ts +++ b/updates_and_signals/safe_message_handlers/src/cluster-manager.ts @@ -116,7 +116,6 @@ export class ClusterManager { return { maxAssignedNodes: this.state.maxAssignedNodes, assignedNodes: this.getAssignedNodes().size, - badNodes: this.getBadNodes().size, }; } @@ -124,10 +123,6 @@ export class ClusterManager { return new Set(Array.from(this.state.nodes.keys()).filter((key) => this.state.nodes.get(key) === null)); } - getBadNodes(): Set { - return new Set(Array.from(this.state.nodes.keys()).filter((key) => this.state.nodes.get(key) === 'BAD!')); - } - getAssignedNodes(jobName?: string): Set { return new Set( Array.from(this.state.nodes.keys()).filter((key) => { diff --git a/updates_and_signals/safe_message_handlers/src/types.ts b/updates_and_signals/safe_message_handlers/src/types.ts index 3493b50b..6454944b 100644 --- a/updates_and_signals/safe_message_handlers/src/types.ts +++ b/updates_and_signals/safe_message_handlers/src/types.ts @@ -12,7 +12,6 @@ export interface ClusterManagerInput { export interface ClusterManagerStateSummary { maxAssignedNodes: number; assignedNodes: number; - badNodes: number; } export interface AssignNodesToJobUpdateInput { From 46e2df0fe90b0c12edc688f3f3ae8f313f0cc658 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 24 Jul 2024 10:27:33 -0400 Subject: [PATCH 16/21] Remove unnecessary control logic and otherwise simplify main loop --- .../safe_message_handlers/src/workflows.ts | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts index 73a95d2f..0bd44a0d 100644 --- a/updates_and_signals/safe_message_handlers/src/workflows.ts +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -44,19 +44,13 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis // // The cluster manager workflow is a long-running workflow ("entity" workflow). Most of its logic // lies in the message-processing handlers implented in the ClusterManager class. The main - // workflow itself is a loop that does the following: - // - process messages - // - continue-as-new when suggested - // - await wf.condition(() => manager.state.clusterStarted); - for (;;) { - await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested); - if (manager.state.clusterShutdown) { - break; - } - if (wf.workflowInfo().continueAsNewSuggested) { - await wf.continueAsNew({ state: manager.getState() }); - } + // workflow itself simply waits until the cluster is shutdown, or the workflow needs to + // continue-as-new. + await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested); + if (wf.workflowInfo().continueAsNewSuggested) { + await wf.continueAsNew({ state: manager.getState() }); + return undefined as never; + } else { + return manager.getStateSummary(); } - return manager.getStateSummary(); } From ea84f781b11a1ccc272e40fa1680df9579e313f2 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 28 Aug 2024 22:50:53 -0400 Subject: [PATCH 17/21] Bump sdk version --- updates_and_signals/safe_message_handlers/package.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/package.json b/updates_and_signals/safe_message_handlers/package.json index 5b320346..ea7ee4e7 100644 --- a/updates_and_signals/safe_message_handlers/package.json +++ b/updates_and_signals/safe_message_handlers/package.json @@ -22,16 +22,16 @@ ] }, "dependencies": { - "@temporalio/activity": "^1.9.0", - "@temporalio/client": "^1.9.0", - "@temporalio/worker": "^1.9.0", - "@temporalio/workflow": "^1.9.0", + "@temporalio/activity": "^1.11.1", + "@temporalio/client": "^1.11.1", + "@temporalio/worker": "^1.11.1", + "@temporalio/workflow": "^1.11.1", "async-mutex": "^0.5.0", "nanoid": "3.x", "uuid": "^10.0.0" }, "devDependencies": { - "@temporalio/testing": "^1.9.0", + "@temporalio/testing": "^1.11.1", "@tsconfig/node16": "^1.0.0", "@types/mocha": "8.x", "@types/node": "^16.11.43", From 1a7c8c744104d24ae2526c17720b0f1b1226ec71 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 28 Aug 2024 22:55:33 -0400 Subject: [PATCH 18/21] Send fewer bytes in workflow ID --- updates_and_signals/safe_message_handlers/src/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/updates_and_signals/safe_message_handlers/src/client.ts b/updates_and_signals/safe_message_handlers/src/client.ts index d9a42e39..39524c26 100644 --- a/updates_and_signals/safe_message_handlers/src/client.ts +++ b/updates_and_signals/safe_message_handlers/src/client.ts @@ -9,6 +9,6 @@ export async function startClusterManager(): Promise Date: Wed, 28 Aug 2024 23:08:27 -0400 Subject: [PATCH 19/21] wait for handlers to finish before CAN --- updates_and_signals/safe_message_handlers/src/workflows.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts index 0bd44a0d..ff1e9e10 100644 --- a/updates_and_signals/safe_message_handlers/src/workflows.ts +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -48,6 +48,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis // continue-as-new. await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested); if (wf.workflowInfo().continueAsNewSuggested) { + await wf.condition(wf.allHandlersFinished); await wf.continueAsNew({ state: manager.getState() }); return undefined as never; } else { From d5674ec943de2439ffe902e48cdbfd80e7bf089d Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 29 Aug 2024 17:07:41 -0400 Subject: [PATCH 20/21] Add comment --- updates_and_signals/safe_message_handlers/src/workflows.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts index ff1e9e10..5931ae5a 100644 --- a/updates_and_signals/safe_message_handlers/src/workflows.ts +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -48,6 +48,12 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis // continue-as-new. await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested); if (wf.workflowInfo().continueAsNewSuggested) { + // You should typically wait for all async handlers to finish before + // completing a workflow or continuing as new. If the main workflow method + // is scheduling activities or child workflows, then you should typically + // also arrange that they are completed before completing or continuing as + // new. This sample does not schedule any activities or child workflows, so + // it is sufficient just to wait for handlers to finish. await wf.condition(wf.allHandlersFinished); await wf.continueAsNew({ state: manager.getState() }); return undefined as never; From 479a36f1c1cafeac65a6997a47ecf2f0a134ef53 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 3 Sep 2024 21:51:26 -0400 Subject: [PATCH 21/21] Address code review comments --- .../safe_message_handlers/package.json | 4 +--- .../safe_message_handlers/src/client.ts | 5 ++--- .../src/cluster-manager.ts | 17 ++++++++++------- .../src/test/workflows.test.ts | 3 +-- .../safe_message_handlers/src/workflows.ts | 10 ++++++---- 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/package.json b/updates_and_signals/safe_message_handlers/package.json index ea7ee4e7..443bbf93 100644 --- a/updates_and_signals/safe_message_handlers/package.json +++ b/updates_and_signals/safe_message_handlers/package.json @@ -27,15 +27,13 @@ "@temporalio/worker": "^1.11.1", "@temporalio/workflow": "^1.11.1", "async-mutex": "^0.5.0", - "nanoid": "3.x", - "uuid": "^10.0.0" + "nanoid": "3.x" }, "devDependencies": { "@temporalio/testing": "^1.11.1", "@tsconfig/node16": "^1.0.0", "@types/mocha": "8.x", "@types/node": "^16.11.43", - "@types/uuid": "^10.0.0", "@typescript-eslint/eslint-plugin": "^5.0.0", "@typescript-eslint/parser": "^5.0.0", "eslint": "^7.32.0", diff --git a/updates_and_signals/safe_message_handlers/src/client.ts b/updates_and_signals/safe_message_handlers/src/client.ts index 39524c26..a42b3965 100644 --- a/updates_and_signals/safe_message_handlers/src/client.ts +++ b/updates_and_signals/safe_message_handlers/src/client.ts @@ -1,5 +1,5 @@ import { Connection, Client, WorkflowHandle } from '@temporalio/client'; -import { v4 as uuid } from 'uuid'; +import { nanoid } from 'nanoid'; import { clusterManagerWorkflow } from './workflows'; @@ -7,8 +7,7 @@ export async function startClusterManager(): Promise({ - startToCloseTimeout: '1 minute', // TODO + startToCloseTimeout: '1 minute', }); // ClusterManagerWorkflow keeps track of the job assignments of a cluster of nodes. It exposes an @@ -22,7 +22,7 @@ const { assignNodesToJob, unassignNodesForJob, startCluster } = wf.proxyActiviti // from a 3rd party library is used to ensure this. export class ClusterManager { state: ClusterManagerState; - jobsWithNodesAssigned: Set; + seenJobs: Set; nodesMutex: Mutex; constructor(state?: ClusterManagerState) { @@ -32,8 +32,8 @@ export class ClusterManager { nodes: new Map(), maxAssignedNodes: 0, }; - this.jobsWithNodesAssigned = new Set(); this.nodesMutex = new Mutex(); + this.seenJobs = new Set(); } async startCluster(): Promise { @@ -61,7 +61,7 @@ export class ClusterManager { } return await this.nodesMutex.runExclusive(async (): Promise => { // Idempotency guard: do nothing if the job already has nodes assigned. - if (!new Set(this.state.nodes.values()).has(input.jobName)) { + if (!this.seenJobs.has(input.jobName)) { const unassignedNodes = this.getUnassignedNodes(); if (input.numNodes > unassignedNodes.size) { throw new wf.ApplicationFailure( @@ -75,6 +75,7 @@ export class ClusterManager { for (const node of nodesToAssign) { this.state.nodes.set(node, input.jobName); } + this.seenJobs.add(input.jobName); this.state.maxAssignedNodes = Math.max(this.state.maxAssignedNodes, this.getAssignedNodes().size); } return this.getStateSummary(); @@ -94,8 +95,7 @@ export class ClusterManager { .filter(([_, v]) => v === input.jobName) .map(([k, _]) => k); // This await would be dangerous without the lock held because it would allow interleaving - // with the assignNodesToJob and performHealthCheck operations, both of which mutate - // self.state.nodes. + // with the assignNodesToJob operation, which mutates self.state.nodes. await unassignNodesForJob({ nodes: nodesToUnassign, jobName: input.jobName }); for (const node of nodesToUnassign) { this.state.nodes.set(node, null); @@ -127,7 +127,10 @@ export class ClusterManager { return new Set( Array.from(this.state.nodes.keys()).filter((key) => { const value = this.state.nodes.get(key); - return jobName ? value === jobName : value !== null && value !== 'BAD!'; + if (jobName === undefined) { + return value !== null && value !== 'BAD!'; + } + return value === jobName; }) ); } diff --git a/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts b/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts index 057fdf1e..383328a1 100644 --- a/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts +++ b/updates_and_signals/safe_message_handlers/src/test/workflows.test.ts @@ -47,9 +47,8 @@ describe('cluster manager', function () { it('successfully completes a session', async function () { await worker.runUntil(async function () { const workflow = await env.client.workflow.start(clusterManagerWorkflow, { - args: [{}], taskQueue, - workflowId: 'cluster-manager-' + nanoid(), + workflowId: `cluster-manager-${nanoid()}`, }); await workflow.signal(startClusterSignal); const request1 = { diff --git a/updates_and_signals/safe_message_handlers/src/workflows.ts b/updates_and_signals/safe_message_handlers/src/workflows.ts index 5931ae5a..5605f3ae 100644 --- a/updates_and_signals/safe_message_handlers/src/workflows.ts +++ b/updates_and_signals/safe_message_handlers/src/workflows.ts @@ -15,7 +15,7 @@ export const assignNodesToJobUpdate = wf.defineUpdate('deleteJob'); export const getClusterStatusQuery = wf.defineQuery('getClusterStatus'); -export async function clusterManagerWorkflow(input: ClusterManagerInput): Promise { +export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): Promise { const manager = new ClusterManager(input.state); // // Message-handling API @@ -31,6 +31,9 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis if (input.numNodes <= 0) { throw new Error(`numNodes must be positive (got ${input.numNodes})`); } + if (input.jobName === '') { + throw new Error('jobName cannot be empty'); + } }, }); @@ -47,7 +50,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis // workflow itself simply waits until the cluster is shutdown, or the workflow needs to // continue-as-new. await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested); - if (wf.workflowInfo().continueAsNewSuggested) { + if (!manager.state.clusterShutdown) { // You should typically wait for all async handlers to finish before // completing a workflow or continuing as new. If the main workflow method // is scheduling activities or child workflows, then you should typically @@ -55,8 +58,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput): Promis // new. This sample does not schedule any activities or child workflows, so // it is sufficient just to wait for handlers to finish. await wf.condition(wf.allHandlersFinished); - await wf.continueAsNew({ state: manager.getState() }); - return undefined as never; + return await wf.continueAsNew({ state: manager.getState() }); } else { return manager.getStateSummary(); }