Skip to content

Commit 6460fd4

Browse files
authored
Signal workflow API (#902)
1 parent bc3821e commit 6460fd4

File tree

6 files changed

+251
-0
lines changed

6 files changed

+251
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { type NextRequest } from 'next/server';
2+
3+
import { signalWorkflow } from '@/route-handlers/signal-workflow/signal-workflow';
4+
import { type RequestParams } from '@/route-handlers/signal-workflow/signal-workflow.types';
5+
import { routeHandlerWithMiddlewares } from '@/utils/route-handlers-middleware';
6+
import routeHandlersDefaultMiddlewares from '@/utils/route-handlers-middleware/config/route-handlers-default-middlewares.config';
7+
8+
export async function POST(
9+
request: NextRequest,
10+
options: { params: RequestParams['params'] }
11+
) {
12+
return routeHandlerWithMiddlewares(
13+
signalWorkflow,
14+
request,
15+
options,
16+
routeHandlersDefaultMiddlewares
17+
);
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import { NextRequest } from 'next/server';
2+
3+
import { GRPCError } from '@/utils/grpc/grpc-error';
4+
import { mockGrpcClusterMethods } from '@/utils/route-handlers-middleware/middlewares/__mocks__/grpc-cluster-methods';
5+
6+
import { signalWorkflow } from '../signal-workflow';
7+
import {
8+
type SignalWorkflowRequestBody,
9+
type Context,
10+
} from '../signal-workflow.types';
11+
12+
const defaultRequestBody = {
13+
signalName: 'test-signal',
14+
signalInput: '"test-input"',
15+
} satisfies SignalWorkflowRequestBody;
16+
17+
describe(signalWorkflow.name, () => {
18+
beforeEach(() => {
19+
jest.clearAllMocks();
20+
});
21+
22+
it('calls signalWorkflow and returns valid response', async () => {
23+
const { res, mockSignalWorkflow } = await setup({});
24+
25+
expect(mockSignalWorkflow).toHaveBeenCalledWith({
26+
domain: 'mock-domain',
27+
workflowExecution: {
28+
workflowId: 'mock-wfid',
29+
runId: 'mock-runid',
30+
},
31+
signalName: 'test-signal',
32+
signalInput: { data: Buffer.from('"test-input"') },
33+
});
34+
35+
const responseJson = await res.json();
36+
expect(responseJson).toEqual({});
37+
});
38+
39+
it('calls signalWorkflow without signalInput when not provided', async () => {
40+
const { mockSignalWorkflow } = await setup({
41+
requestBody: JSON.stringify({
42+
signalName: 'signal-without-input',
43+
}),
44+
});
45+
46+
expect(mockSignalWorkflow).toHaveBeenCalledWith(
47+
expect.objectContaining({
48+
signalName: 'signal-without-input',
49+
signalInput: undefined,
50+
})
51+
);
52+
});
53+
54+
it('returns an error if something went wrong in the backend', async () => {
55+
const { res, mockSignalWorkflow } = await setup({
56+
error: true,
57+
});
58+
59+
expect(mockSignalWorkflow).toHaveBeenCalled();
60+
61+
expect(res.status).toEqual(500);
62+
const responseJson = await res.json();
63+
expect(responseJson).toEqual(
64+
expect.objectContaining({
65+
message: 'Could not signal workflow',
66+
})
67+
);
68+
});
69+
70+
it('returns an error if the signal input has an unexpected format', async () => {
71+
const { res, mockSignalWorkflow } = await setup({
72+
requestBody: JSON.stringify({
73+
signalName: 'test-signal',
74+
signalInput: 'not-an-object', // should be an object
75+
} satisfies SignalWorkflowRequestBody),
76+
});
77+
78+
expect(mockSignalWorkflow).not.toHaveBeenCalled();
79+
80+
const responseJson = await res.json();
81+
expect(responseJson).toEqual(
82+
expect.objectContaining({
83+
message: 'Invalid values provided for workflow signal',
84+
})
85+
);
86+
});
87+
});
88+
89+
async function setup({
90+
requestBody = JSON.stringify(defaultRequestBody),
91+
error,
92+
}: {
93+
requestBody?: string;
94+
error?: true;
95+
}) {
96+
const mockSignalWorkflow = jest
97+
.spyOn(mockGrpcClusterMethods, 'signalWorkflow')
98+
.mockImplementationOnce(async () => {
99+
if (error) {
100+
throw new GRPCError('Could not signal workflow');
101+
}
102+
return {};
103+
});
104+
105+
const res = await signalWorkflow(
106+
new NextRequest('http://localhost', {
107+
method: 'POST',
108+
body: requestBody ?? '{}',
109+
}),
110+
{
111+
params: {
112+
domain: 'mock-domain',
113+
cluster: 'mock-cluster',
114+
workflowId: 'mock-wfid',
115+
runId: 'mock-runid',
116+
},
117+
},
118+
{
119+
grpcClusterMethods: mockGrpcClusterMethods,
120+
} as Context
121+
);
122+
123+
return { res, mockSignalWorkflow };
124+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { z } from 'zod';
2+
3+
import losslessJsonParse from '@/utils/lossless-json-parse';
4+
5+
const signalWorkflowInputSchema = z.string().superRefine((str, ctx) => {
6+
if (!str) return undefined;
7+
8+
try {
9+
return losslessJsonParse(str);
10+
} catch {
11+
ctx.addIssue({ code: 'custom', message: 'Invalid JSON' });
12+
return z.NEVER;
13+
}
14+
});
15+
16+
export default signalWorkflowInputSchema;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { z } from 'zod';
2+
3+
import signalWorkflowInputSchema from './signal-workflow-input-schema';
4+
5+
const signalWorkflowRequestBodySchema = z.object({
6+
signalName: z.string().min(1),
7+
signalInput: signalWorkflowInputSchema.optional(),
8+
});
9+
10+
export default signalWorkflowRequestBodySchema;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { type NextRequest, NextResponse } from 'next/server';
2+
3+
import decodeUrlParams from '@/utils/decode-url-params';
4+
import { getHTTPStatusCode, GRPCError } from '@/utils/grpc/grpc-error';
5+
import logger, { type RouteHandlerErrorPayload } from '@/utils/logger';
6+
7+
import signalWorkflowRequestBodySchema from './schemas/signal-workflow-request-body-schema';
8+
import { type Context, type RequestParams } from './signal-workflow.types';
9+
10+
export async function signalWorkflow(
11+
request: NextRequest,
12+
requestParams: RequestParams,
13+
ctx: Context
14+
) {
15+
const requestBody = await request.json();
16+
const { data, error } =
17+
signalWorkflowRequestBodySchema.safeParse(requestBody);
18+
19+
if (error) {
20+
return NextResponse.json(
21+
{
22+
message: 'Invalid values provided for workflow signal',
23+
validationErrors: error.errors,
24+
},
25+
{ status: 400 }
26+
);
27+
}
28+
29+
const decodedParams = decodeUrlParams(requestParams.params);
30+
31+
try {
32+
const response = await ctx.grpcClusterMethods.signalWorkflow({
33+
domain: decodedParams.domain,
34+
workflowExecution: {
35+
workflowId: decodedParams.workflowId,
36+
runId: decodedParams.runId,
37+
},
38+
signalName: data.signalName,
39+
signalInput: data.signalInput
40+
? { data: Buffer.from(data.signalInput) }
41+
: undefined,
42+
// TODO: add user identity
43+
});
44+
45+
return NextResponse.json(response);
46+
} catch (e) {
47+
logger.error<RouteHandlerErrorPayload>(
48+
{ requestParams: decodedParams, error: e },
49+
'Error signaling workflow'
50+
);
51+
52+
return NextResponse.json(
53+
{
54+
message:
55+
e instanceof GRPCError ? e.message : 'Error signaling workflow',
56+
cause: e,
57+
},
58+
{ status: getHTTPStatusCode(e) }
59+
);
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { type z } from 'zod';
2+
3+
import { type GRPCClusterMethods } from '@/utils/grpc/grpc-client';
4+
5+
import type signalWorkflowRequestBodySchema from './schemas/signal-workflow-request-body-schema';
6+
7+
export type RequestParams = {
8+
params: {
9+
domain: string;
10+
cluster: string;
11+
workflowId: string;
12+
runId: string;
13+
};
14+
};
15+
16+
export type SignalWorkflowRequestBody = z.infer<
17+
typeof signalWorkflowRequestBodySchema
18+
>;
19+
20+
export type Context = {
21+
grpcClusterMethods: GRPCClusterMethods;
22+
};

0 commit comments

Comments
 (0)