Skip to content

Add signal workflow to workflow actions #904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { type NextRequest } from 'next/server';

import { signalWorkflow } from '@/route-handlers/signal-workflow/signal-workflow';
import { type RequestParams } from '@/route-handlers/signal-workflow/signal-workflow.types';
import { routeHandlerWithMiddlewares } from '@/utils/route-handlers-middleware';
import routeHandlersDefaultMiddlewares from '@/utils/route-handlers-middleware/config/route-handlers-default-middlewares.config';

export async function POST(
request: NextRequest,
options: { params: RequestParams['params'] }
) {
return routeHandlerWithMiddlewares(
signalWorkflow,
request,
options,
routeHandlersDefaultMiddlewares
);
}
1 change: 1 addition & 0 deletions src/config/dynamic/resolvers/schemas/resolver-schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const resolverSchemas: ResolverSchemas = {
returnType: z.object({
cancel: workflowActionsEnabledValueSchema,
terminate: workflowActionsEnabledValueSchema,
signal: workflowActionsEnabledValueSchema,
restart: workflowActionsEnabledValueSchema,
reset: workflowActionsEnabledValueSchema,
}),
Expand Down
1 change: 1 addition & 0 deletions src/config/dynamic/resolvers/workflow-actions-enabled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ export default async function workflowActionsEnabled(
cancel: 'ENABLED',
restart: 'ENABLED',
reset: 'ENABLED',
signal: 'ENABLED',
};
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type WORKFLOW_ACTIONS_DISABLED_VALUES_CONFIG from './workflow-actions-disabled-values.config';

export type WorkflowActionID = 'cancel' | 'terminate' | 'restart' | 'reset';
export type WorkflowActionID =
| 'cancel'
| 'terminate'
| 'restart'
| 'reset'
| 'signal';

export type WorkflowActionsEnabledResolverParams = {
domain: string;
Expand Down
124 changes: 124 additions & 0 deletions src/route-handlers/signal-workflow/__tests__/signal-workflow.node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { NextRequest } from 'next/server';

import { GRPCError } from '@/utils/grpc/grpc-error';
import { mockGrpcClusterMethods } from '@/utils/route-handlers-middleware/middlewares/__mocks__/grpc-cluster-methods';

import { signalWorkflow } from '../signal-workflow';
import {
type SignalWorkflowRequestBody,
type Context,
} from '../signal-workflow.types';

const defaultRequestBody = {
signalName: 'test-signal',
signalInput: '"test-input"',
} satisfies SignalWorkflowRequestBody;

describe(signalWorkflow.name, () => {
beforeEach(() => {
jest.clearAllMocks();
});

it('calls signalWorkflow and returns valid response', async () => {
const { res, mockSignalWorkflow } = await setup({});

expect(mockSignalWorkflow).toHaveBeenCalledWith({
domain: 'mock-domain',
workflowExecution: {
workflowId: 'mock-wfid',
runId: 'mock-runid',
},
signalName: 'test-signal',
signalInput: { data: Buffer.from('"test-input"') },
});

const responseJson = await res.json();
expect(responseJson).toEqual({});
});

it('calls signalWorkflow without signalInput when not provided', async () => {
const { mockSignalWorkflow } = await setup({
requestBody: JSON.stringify({
signalName: 'signal-without-input',
}),
});

expect(mockSignalWorkflow).toHaveBeenCalledWith(
expect.objectContaining({
signalName: 'signal-without-input',
signalInput: undefined,
})
);
});

it('returns an error if something went wrong in the backend', async () => {
const { res, mockSignalWorkflow } = await setup({
error: true,
});

expect(mockSignalWorkflow).toHaveBeenCalled();

expect(res.status).toEqual(500);
const responseJson = await res.json();
expect(responseJson).toEqual(
expect.objectContaining({
message: 'Could not signal workflow',
})
);
});

it('returns an error if the signal input has an unexpected format', async () => {
const { res, mockSignalWorkflow } = await setup({
requestBody: JSON.stringify({
signalName: 'test-signal',
signalInput: 'not-an-object', // should be an object
} satisfies SignalWorkflowRequestBody),
});

expect(mockSignalWorkflow).not.toHaveBeenCalled();

const responseJson = await res.json();
expect(responseJson).toEqual(
expect.objectContaining({
message: 'Invalid values provided for workflow signal',
})
);
});
});

async function setup({
requestBody = JSON.stringify(defaultRequestBody),
error,
}: {
requestBody?: string;
error?: true;
}) {
const mockSignalWorkflow = jest
.spyOn(mockGrpcClusterMethods, 'signalWorkflow')
.mockImplementationOnce(async () => {
if (error) {
throw new GRPCError('Could not signal workflow');
}
return {};
});

const res = await signalWorkflow(
new NextRequest('http://localhost', {
method: 'POST',
body: requestBody ?? '{}',
}),
{
params: {
domain: 'mock-domain',
cluster: 'mock-cluster',
workflowId: 'mock-wfid',
runId: 'mock-runid',
},
},
{
grpcClusterMethods: mockGrpcClusterMethods,
} as Context
);

return { res, mockSignalWorkflow };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { z } from 'zod';

import losslessJsonParse from '@/utils/lossless-json-parse';

const signalWorkflowInputSchema = z.string().superRefine((str, ctx) => {
if (!str) return undefined;

try {
return losslessJsonParse(str);
} catch {
ctx.addIssue({ code: 'custom', message: 'Invalid JSON' });
return z.NEVER;
}
});

export default signalWorkflowInputSchema;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { z } from 'zod';

import signalWorkflowInputSchema from './signal-workflow-input-schema';

const signalWorkflowRequestBodySchema = z.object({
signalName: z.string().min(1),
signalInput: signalWorkflowInputSchema.optional(),
});

export default signalWorkflowRequestBodySchema;
61 changes: 61 additions & 0 deletions src/route-handlers/signal-workflow/signal-workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { type NextRequest, NextResponse } from 'next/server';

import decodeUrlParams from '@/utils/decode-url-params';
import { getHTTPStatusCode, GRPCError } from '@/utils/grpc/grpc-error';
import logger, { type RouteHandlerErrorPayload } from '@/utils/logger';

import signalWorkflowRequestBodySchema from './schemas/signal-workflow-request-body-schema';
import { type Context, type RequestParams } from './signal-workflow.types';

export async function signalWorkflow(
request: NextRequest,
requestParams: RequestParams,
ctx: Context
) {
const requestBody = await request.json();
const { data, error } =
signalWorkflowRequestBodySchema.safeParse(requestBody);

if (error) {
return NextResponse.json(
{
message: 'Invalid values provided for workflow signal',
validationErrors: error.errors,
},
{ status: 400 }
);
}

const decodedParams = decodeUrlParams(requestParams.params);

try {
const response = await ctx.grpcClusterMethods.signalWorkflow({
domain: decodedParams.domain,
workflowExecution: {
workflowId: decodedParams.workflowId,
runId: decodedParams.runId,
},
signalName: data.signalName,
signalInput: data.signalInput
? { data: Buffer.from(data.signalInput) }
: undefined,
// TODO: add user identity
});

return NextResponse.json(response);
} catch (e) {
logger.error<RouteHandlerErrorPayload>(
{ requestParams: decodedParams, error: e },
'Error signaling workflow'
);

return NextResponse.json(
{
message:
e instanceof GRPCError ? e.message : 'Error signaling workflow',
cause: e,
},
{ status: getHTTPStatusCode(e) }
);
}
}
22 changes: 22 additions & 0 deletions src/route-handlers/signal-workflow/signal-workflow.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { type z } from 'zod';

import { type GRPCClusterMethods } from '@/utils/grpc/grpc-client';

import type signalWorkflowRequestBodySchema from './schemas/signal-workflow-request-body-schema';

export type RequestParams = {
params: {
domain: string;
cluster: string;
workflowId: string;
runId: string;
};
};

export type SignalWorkflowRequestBody = z.infer<
typeof signalWorkflowRequestBodySchema
>;

export type Context = {
grpcClusterMethods: GRPCClusterMethods;
};
1 change: 1 addition & 0 deletions src/utils/config/__fixtures__/resolved-config-values.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const mockResolvedConfigValues: LoadedConfigResolvedValues = {
WORKFLOW_ACTIONS_ENABLED: {
terminate: 'ENABLED',
cancel: 'ENABLED',
signal: 'ENABLED',
restart: 'ENABLED',
reset: 'ENABLED',
},
Expand Down
39 changes: 39 additions & 0 deletions src/views/workflow-actions/config/workflow-actions.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
MdPowerSettingsNew,
MdOutlineRestartAlt,
MdRefresh,
MdOutlineWifiTethering,
} from 'react-icons/md';

import { type CancelWorkflowResponse } from '@/route-handlers/cancel-workflow/cancel-workflow.types';
Expand All @@ -20,6 +21,12 @@ import {
type ResetWorkflowSubmissionData,
type ResetWorkflowFormData,
} from '../workflow-action-reset-form/workflow-action-reset-form.types';
import { signalWorkflowFormSchema } from '../workflow-action-signal-form/schemas/signal-workflow-form-schema';
import WorkflowActionSignalForm from '../workflow-action-signal-form/workflow-action-signal-form';
import {
type SignalWorkflowSubmissionData,
type SignalWorkflowFormData,
} from '../workflow-action-signal-form/workflow-action-signal-form.types';
import { type WorkflowAction } from '../workflow-actions.types';

const cancelWorkflowActionConfig: WorkflowAction<CancelWorkflowResponse> = {
Expand Down Expand Up @@ -69,6 +76,37 @@ const terminateWorkflowActionConfig: WorkflowAction<TerminateWorkflowResponse> =
renderSuccessMessage: () => 'Workflow has been terminated.',
};

const signalWorkflowActionConfig: WorkflowAction<
unknown,
SignalWorkflowFormData,
SignalWorkflowSubmissionData
> = {
id: 'signal',
label: 'Signal',
subtitle: 'Send a signal to the workflow',
modal: {
text: 'Provide data to running workflows using signals',
docsLink: {
text: 'Learn more about signals',
href: 'https://cadenceworkflow.io/docs/go-client/signals',
},
withForm: true,
form: WorkflowActionSignalForm,
formSchema: signalWorkflowFormSchema,
transformFormDataToSubmission: (formData) => formData,
},
icon: MdOutlineWifiTethering,
getRunnableStatus: (workflow) =>
getWorkflowIsCompleted(
workflow.workflowExecutionInfo?.closeEvent?.attributes ?? ''
)
? 'NOT_RUNNABLE_WORKFLOW_CLOSED'
: 'RUNNABLE',
apiRoute: 'signal',
renderSuccessMessage: ({ inputParams }) =>
`Successfully sent signal "${inputParams.submissionData.signalName}"`,
};

const restartWorkflowActionConfig: WorkflowAction<RestartWorkflowResponse> = {
id: 'restart',
label: 'Restart',
Expand Down Expand Up @@ -136,6 +174,7 @@ export const resetWorkflowActionConfig: WorkflowAction<
const workflowActionsConfig = [
cancelWorkflowActionConfig,
terminateWorkflowActionConfig,
signalWorkflowActionConfig,
restartWorkflowActionConfig,
resetWorkflowActionConfig,
] as const satisfies WorkflowAction<any, any, any>[];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ export type Props = WorkflowActionSuccessMessageProps<
{ runId: string }
> & {
successMessage: string;
onDismissMessage: () => void;
};
Loading