diff --git a/message-passing/safe-message-handlers/src/client.ts b/message-passing/safe-message-handlers/src/client.ts index a42b3965..8ba05a6c 100644 --- a/message-passing/safe-message-handlers/src/client.ts +++ b/message-passing/safe-message-handlers/src/client.ts @@ -9,5 +9,6 @@ export async function startClusterManager(): Promise({ @@ -25,15 +26,17 @@ export class ClusterManager { state: ClusterManagerState; seenJobs: Set; nodesMutex: Mutex; + private maxHistoryLength?: number; - constructor(state?: ClusterManagerState) { - this.state = state ?? { + constructor(input: ClusterManagerInput = {}) { + this.state = input.state ?? { clusterState: ClusterState.NOT_STARTED, nodes: new Map(), maxAssignedNodes: 0, }; this.nodesMutex = new Mutex(); this.seenJobs = new Set(); + this.maxHistoryLength = input.testContinueAsNew ? 120 : undefined; } async startCluster(): Promise { @@ -145,4 +148,17 @@ export class ClusterManager { }) ); } + + shouldContinueAsNew(): boolean { + if (wf.workflowInfo().continueAsNewSuggested) { + return true; + } + + // This is just for ease-of-testing. In production, we trust temporal to tell us when to continue-as-new. + if (this.maxHistoryLength !== undefined && wf.workflowInfo().historyLength > this.maxHistoryLength) { + return true; + } + + return false; + } } diff --git a/message-passing/safe-message-handlers/src/types.ts b/message-passing/safe-message-handlers/src/types.ts index 50fecb66..26864cd1 100644 --- a/message-passing/safe-message-handlers/src/types.ts +++ b/message-passing/safe-message-handlers/src/types.ts @@ -6,6 +6,7 @@ export interface ClusterManagerState { export interface ClusterManagerInput { state?: ClusterManagerState; + testContinueAsNew?: boolean; } export interface ClusterManagerStateSummary { diff --git a/message-passing/safe-message-handlers/src/workflows.ts b/message-passing/safe-message-handlers/src/workflows.ts index 84c99e9b..a2c59f47 100644 --- a/message-passing/safe-message-handlers/src/workflows.ts +++ b/message-passing/safe-message-handlers/src/workflows.ts @@ -17,7 +17,7 @@ export const deleteJobUpdate = wf.defineUpdate('de export const getClusterStatusQuery = wf.defineQuery('getClusterStatus'); export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): Promise { - const manager = new ClusterManager(input.state); + const manager = new ClusterManager(input); // // Message-handling API // @@ -52,7 +52,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P // continue-as-new. await wf.condition(() => manager.state.clusterState === ClusterState.STARTED); await wf.condition( - () => manager.state.clusterState === ClusterState.SHUTTING_DOWN || wf.workflowInfo().continueAsNewSuggested + () => manager.state.clusterState === ClusterState.SHUTTING_DOWN || manager.shouldContinueAsNew() ); if (manager.state.clusterState !== ClusterState.SHUTTING_DOWN) { // You should typically wait for all async handlers to finish before @@ -62,7 +62,10 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P // new. This sample does not schedule any activities or child workflows, so // it is sufficient just to wait for handlers to finish. await wf.condition(wf.allHandlersFinished); - return await wf.continueAsNew({ state: manager.getState() }); + return await wf.continueAsNew({ + state: manager.getState(), + testContinueAsNew: input.testContinueAsNew + }); } else { return manager.getStateSummary(); }