Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions message-passing/safe-message-handlers/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ export async function startClusterManager(): Promise<WorkflowHandle<typeof clust
return client.workflow.start(clusterManagerWorkflow, {
taskQueue: 'safe-message-handlers-task-queue',
workflowId: `cm-${nanoid()}`,
args: [{ testContinueAsNew: false }],
});
}
20 changes: 18 additions & 2 deletions message-passing/safe-message-handlers/src/cluster-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ClusterState,
ClusterManagerStateSummary,
DeleteJobUpdateInput,
ClusterManagerInput,
} from './types';

const { assignNodesToJob, unassignNodesForJob, startCluster, shutdownCluster } = wf.proxyActivities<typeof activities>({
Expand All @@ -25,15 +26,17 @@ export class ClusterManager {
state: ClusterManagerState;
seenJobs: Set<string>;
nodesMutex: Mutex;
private maxHistoryLength?: number;

constructor(state?: ClusterManagerState) {
this.state = state ?? {
constructor(input: ClusterManagerInput = {}) {
this.state = input.state ?? {
clusterState: ClusterState.NOT_STARTED,
nodes: new Map<string, string | null>(),
maxAssignedNodes: 0,
};
this.nodesMutex = new Mutex();
this.seenJobs = new Set<string>();
this.maxHistoryLength = input.testContinueAsNew ? 120 : undefined;
}

async startCluster(): Promise<void> {
Expand Down Expand Up @@ -145,4 +148,17 @@ export class ClusterManager {
})
);
}

shouldContinueAsNew(): boolean {
if (wf.workflowInfo().continueAsNewSuggested) {
return true;
}

// This is just for ease-of-testing. In production, we trust temporal to tell us when to continue-as-new.
if (this.maxHistoryLength !== undefined && wf.workflowInfo().historyLength > this.maxHistoryLength) {
return true;
}

return false;
}
}
1 change: 1 addition & 0 deletions message-passing/safe-message-handlers/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface ClusterManagerState {

export interface ClusterManagerInput {
state?: ClusterManagerState;
testContinueAsNew?: boolean;
}

export interface ClusterManagerStateSummary {
Expand Down
9 changes: 6 additions & 3 deletions message-passing/safe-message-handlers/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const deleteJobUpdate = wf.defineUpdate<void, [DeleteJobUpdateInput]>('de
export const getClusterStatusQuery = wf.defineQuery<ClusterManagerStateSummary>('getClusterStatus');

export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): Promise<ClusterManagerStateSummary> {
const manager = new ClusterManager(input.state);
const manager = new ClusterManager(input);
//
// Message-handling API
//
Expand Down Expand Up @@ -52,7 +52,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P
// continue-as-new.
await wf.condition(() => manager.state.clusterState === ClusterState.STARTED);
await wf.condition(
() => manager.state.clusterState === ClusterState.SHUTTING_DOWN || wf.workflowInfo().continueAsNewSuggested
() => manager.state.clusterState === ClusterState.SHUTTING_DOWN || manager.shouldContinueAsNew()
);
if (manager.state.clusterState !== ClusterState.SHUTTING_DOWN) {
// You should typically wait for all async handlers to finish before
Expand All @@ -62,7 +62,10 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P
// new. This sample does not schedule any activities or child workflows, so
// it is sufficient just to wait for handlers to finish.
await wf.condition(wf.allHandlersFinished);
return await wf.continueAsNew<typeof clusterManagerWorkflow>({ state: manager.getState() });
return await wf.continueAsNew<typeof clusterManagerWorkflow>({
state: manager.getState(),
testContinueAsNew: input.testContinueAsNew
});
} else {
return manager.getStateSummary();
}
Expand Down