Skip to content

Commit af834e3

Browse files
authored
DX-1584: context.invoke and serveMany (#63)
* feat: add typesafe invoke routes * fix: add platforms * feat: add invokeCallback and workflowid to serve response * fix: tests * fix: add example route * feat: try updating workflowId inplace * fix: update original routes and update message * fix: lint * fix: use path for routing instead of header * fix: add example route * feat: add ci route * fix: lint * feat: add serveMany for hono * feat: add serveMany for cloudflare-workers * feat: add serveMany for express * feat: add serveMany for h3 * feat: add astro serveMany * feat: add serveMany for sveltekit * feat: add serveMany for nextjs pages router * fix: add serve-many tests * fix: tests * fix: add invoke count header * fix: add options to serveMany * fix: tests * fix: make serveMany options optional in express * fix: return 404 response * fix: remove callback from createWorkflow result it was needed before for making request payload and return types of workflows typesafe. now that we have routeFunction in the result, we don't need the callback. * fix: add test for invokeWorkflow * fix: send invoke count in all requests * fix: rm invoke count header from context.call * fix: add ci routes back * fix: read invoke count in initial trigger and fix tests * fix: ci errors * fix: failure tests * fix: add zod to dependencies * fix: lint * fix: handle query params in serveMany * fix: fmt * fix: test * fix: add flow control to invoke * fix: fmt
1 parent 4ab0373 commit af834e3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+2341
-765
lines changed

.prettierrc

-13
This file was deleted.

bun.lockb

0 Bytes
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import type { WorkflowContext } from "@upstash/workflow"
2+
import { createWorkflow, serveMany } from "@upstash/workflow/astro"
3+
4+
const workflowOne = createWorkflow(async (context) => {
5+
await context.run("step 1", async () => {
6+
console.log("workflow one says hi")
7+
})
8+
9+
const { body, isCanceled, isFailed } = await context.invoke("invoking other", {
10+
workflow: workflowTwo,
11+
body: "hello from workflow one",
12+
})
13+
14+
await context.run("checking invoke results", () => {
15+
console.log("invoke results", { body, isCanceled, isFailed })
16+
})
17+
18+
await context.run("step 2", async () => {
19+
console.log("workflow one says bye")
20+
})
21+
}, {
22+
// env must be passed in astro.
23+
// for local dev, we need import.meta.env.
24+
// For deployment, we need process.env:
25+
env: {
26+
...process.env,
27+
...import.meta.env
28+
}
29+
})
30+
31+
const workflowTwo = createWorkflow(async (context: WorkflowContext<string>) => {
32+
await context.run("step 1", async () => {
33+
console.log("workflow two says hi")
34+
})
35+
36+
await context.run("step 2", async () => {
37+
console.log("workflow two says bye")
38+
})
39+
40+
return "workflow two done"
41+
}, {
42+
retries: 0,
43+
// env must be passed in astro.
44+
// for local dev, we need import.meta.env.
45+
// For deployment, we need process.env:
46+
env: {
47+
...process.env,
48+
...import.meta.env
49+
}
50+
})
51+
52+
export const { POST } = serveMany({
53+
workflowOne,
54+
workflowTwo
55+
})

examples/ci/app/ci/constants.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@ export const TEST_ROUTES: Pick<TestConfig, "route" | "waitForSeconds">[] = [
9696
route: "call/workflow-with-failureUrl",
9797
waitForSeconds: 5
9898
},
99-
99+
{
100+
route: "invoke/workflows/workflowOne",
101+
waitForSeconds: 10
102+
}
103+
100104
/**
101105
* TEST LARGE PAYLOAD CASES
102106
*

examples/ci/app/ci/utils.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,9 @@ export const getTestConfig = async (route: string) => {
5656
if (response.status !== 200) {
5757
throw new Error(`Failed to get the error config: ${response.statusText}`)
5858
}
59-
6059

6160
const testConfig = await response.json() as Parameters<typeof testServe>[1]
62-
61+
6362
return testConfig
6463
}
6564

@@ -84,7 +83,7 @@ export const initiateTest = async (route: string) => {
8483
await redis.checkRedisForResults(route, randomTestId, expectedCallCount, expectedResult)
8584
}
8685

87-
type ExpectType = number | string | object | undefined | void | boolean
86+
type ExpectType = number | string | object | undefined | void | boolean | null
8887
export const expect = <TObject extends ExpectType = ExpectType>(
8988
received: TObject,
9089
expected: TObject

examples/ci/app/test-routes/failureUrl/third-party/route.ts

+8-5
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,20 @@ export const POST = async (request: Request) => {
77
const result = await request.json() as {
88
body: string,
99
header: Record<string, string[]>,
10-
sourceHeader: Record<string, string[]>,
1110
workflowRunId: string
1211
}
13-
12+
1413
const errorMessage = atob(result.body)
1514
expect(errorMessage, `{"error":"Error","message":"${ERROR_MESSAGE}"}`)
16-
expect(result.sourceHeader[HEADER][0], HEADER_VALUE)
15+
expect(request.headers.get(HEADER), HEADER_VALUE)
1716

1817
// get id and route
19-
const randomTestId = result.sourceHeader[CI_RANDOM_ID_HEADER][0]
20-
const route = result.sourceHeader[CI_ROUTE_HEADER][0]
18+
const randomTestId = request.headers.get(CI_RANDOM_ID_HEADER)
19+
const route = request.headers.get(CI_ROUTE_HEADER)
20+
21+
if (!route || !randomTestId || !errorMessage) {
22+
throw new Error(`failed to get route, randomTestId or errorMessage. route: ${route}, randomTestId: ${randomTestId}, errorMessage: ${errorMessage}`)
23+
}
2124

2225
await saveResultsWithoutContext(
2326
route, randomTestId, errorMessage
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { expect } from "app/ci/utils";
2+
3+
export const GET = async (request: Request) => {
4+
expect(request.headers.get("upstash-workflow-invoke-count"), null)
5+
return new Response(JSON.stringify({}), { status: 200 });
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
import { WorkflowContext } from "@upstash/workflow";
2+
import { createWorkflow, serveMany } from "@upstash/workflow/nextjs";
3+
import { CI_RANDOM_ID_HEADER, CI_ROUTE_HEADER, TEST_ROUTE_PREFIX } from "app/ci/constants";
4+
import { saveResult } from "app/ci/upstash/redis";
5+
import { expect, nanoid, testServe } from "app/ci/utils";
6+
import { z } from "zod";
7+
8+
const testHeader = `test-header-foo`
9+
const headerValue = `header-foo`
10+
const payload = 123
11+
12+
13+
const invokePayload = "invoke-payload"
14+
const invokeResult = "invoke-result"
15+
16+
const invokeHeader = "invoke-header"
17+
const invokeHeaderValue = "invoke-header-value"
18+
19+
const workflowRunIdHeader = "workflow-run-id-header"
20+
21+
const workflowOne = createWorkflow(async (context: WorkflowContext<number>) => {
22+
const workflowRunId = await context.run("step 1", async () => {
23+
console.log("workflow one says hi")
24+
return nanoid()
25+
})
26+
27+
const { body, isCanceled, isFailed } = await context.invoke("invoking other", {
28+
workflow: workflowTwo,
29+
body: invokePayload,
30+
headers: {
31+
[invokeHeader]: invokeHeaderValue,
32+
[workflowRunIdHeader]: workflowRunId,
33+
[CI_ROUTE_HEADER]: context.headers.get(CI_ROUTE_HEADER) as string,
34+
[CI_RANDOM_ID_HEADER]: context.headers.get(CI_RANDOM_ID_HEADER) as string,
35+
},
36+
workflowRunId,
37+
})
38+
39+
expect(body, invokeResult)
40+
expect(isCanceled, false)
41+
expect(isFailed, false)
42+
43+
const { body: failingBody, isCanceled: failingIsCanceled, isFailed: failingIsFailed } = await context.invoke("invoke failing", {
44+
workflow: workflowThree,
45+
body: invokePayload,
46+
headers: {
47+
[CI_ROUTE_HEADER]: context.headers.get(CI_ROUTE_HEADER) as string,
48+
[CI_RANDOM_ID_HEADER]: context.headers.get(CI_RANDOM_ID_HEADER) as string,
49+
},
50+
retries: 0
51+
})
52+
53+
expect(failingBody, undefined)
54+
expect(failingIsCanceled, false)
55+
expect(failingIsFailed, true)
56+
57+
await context.run("step 2", async () => {
58+
console.log("workflow one says bye")
59+
})
60+
61+
62+
await saveResult(
63+
context,
64+
"done invoke"
65+
)
66+
}, {
67+
schema: z.number()
68+
})
69+
70+
const workflowTwo = createWorkflow(async (context: WorkflowContext<string>) => {
71+
expect(context.requestPayload, invokePayload)
72+
expect(context.headers.get(invokeHeader) as string, invokeHeaderValue)
73+
expect(`wfr_${context.headers.get(workflowRunIdHeader)}`, context.workflowRunId)
74+
75+
await context.run("step 1", async () => {
76+
console.log("workflow two says hi")
77+
})
78+
79+
// @ts-expect-error accessing private fields for testing purposes.
80+
// We also check after the first step, because DisabledWorkflowContext
81+
// doesn't have the correct invokeCount
82+
const invokeCount = context.executor.invokeCount
83+
expect(invokeCount, 1)
84+
85+
await context.run("step 2", async () => {
86+
console.log("workflow two says bye")
87+
})
88+
89+
const result = await Promise.all([
90+
context.invoke("invoke branch one", {
91+
workflow: branchOne,
92+
body: 1,
93+
headers: {
94+
[CI_ROUTE_HEADER]: context.headers.get(CI_ROUTE_HEADER) as string,
95+
[CI_RANDOM_ID_HEADER]: context.headers.get(CI_RANDOM_ID_HEADER) as string,
96+
}
97+
}),
98+
context.invoke("invoke branch two", {
99+
workflow: branchTwo,
100+
body: 2,
101+
headers: {
102+
[CI_ROUTE_HEADER]: context.headers.get(CI_ROUTE_HEADER) as string,
103+
[CI_RANDOM_ID_HEADER]: context.headers.get(CI_RANDOM_ID_HEADER) as string,
104+
}
105+
})
106+
])
107+
108+
expect(result[0].body, "branch-one-result")
109+
expect(result[1].body, "branch-two-result")
110+
111+
return invokeResult
112+
})
113+
114+
const workflowThree = createWorkflow(async (context: WorkflowContext<string>) => {
115+
expect(context.requestPayload, invokePayload)
116+
throw new Error("what")
117+
}, {
118+
retries: 0
119+
})
120+
121+
/**
122+
* wait for event workflows
123+
*/
124+
125+
const thirdPartyEndpoint = `${TEST_ROUTE_PREFIX}/invoke/called-endpoint`
126+
const notifiedEventId = "notifiedEvent"
127+
128+
/**
129+
* calls waitForEvent and checks invokeCount
130+
*/
131+
const branchOne = createWorkflow(async (context: WorkflowContext<number>) => {
132+
const { timeout } = await context.waitForEvent("timeoutEvent", "timeoutEvent", { timeout: 1 })
133+
expect(timeout, true)
134+
135+
// @ts-expect-error accessing private fields for testing purposes.
136+
// We also check after the first step, because DisabledWorkflowContext
137+
// doesn't have the correct invokeCount
138+
const invokeCount = context.executor.invokeCount
139+
expect(invokeCount, 2)
140+
141+
const { timeout: isTimeout } = await context.waitForEvent("notified event", notifiedEventId, { timeout: "10s" })
142+
expect(isTimeout, false)
143+
144+
await context.sleep("check", 1)
145+
146+
return "branch-one-result"
147+
})
148+
149+
/**
150+
* notifies branhcOne, calls context.call and checks invokeCount
151+
*/
152+
const branchTwo = createWorkflow(async (context: WorkflowContext<number>) => {
153+
154+
const { status } = await context.call("call", {
155+
url: thirdPartyEndpoint,
156+
method: "GET",
157+
})
158+
159+
expect(status, 200)
160+
161+
// @ts-expect-error accessing private fields for testing purposes.
162+
// We also check after the first step, because DisabledWorkflowContext
163+
// doesn't have the correct invokeCount
164+
const invokeCount = context.executor.invokeCount
165+
expect(invokeCount, 2)
166+
167+
let counter = 0;
168+
while (counter < 10) {
169+
const { notifyResponse } = await context.notify("notified event", notifiedEventId, "data")
170+
counter += 1
171+
await context.sleep("wait", 1)
172+
if (notifyResponse.length) {
173+
break
174+
}
175+
}
176+
177+
await context.sleep("check", 1)
178+
179+
return "branch-two-result"
180+
})
181+
182+
export const { POST, GET } = testServe(
183+
serveMany({
184+
workflowOne,
185+
workflowTwo,
186+
workflowThree,
187+
branchOne,
188+
branchTwo,
189+
}),
190+
{
191+
expectedCallCount: 26,
192+
expectedResult: "done invoke",
193+
payload,
194+
headers: {
195+
[testHeader]: headerValue,
196+
}
197+
}
198+
)

examples/ci/app/test-routes/returns-before-step/route.ts

+22-17
Original file line numberDiff line numberDiff line change
@@ -10,36 +10,41 @@ export const { POST, GET } = testServe(
1010
serve<string>(
1111
async (context) => {
1212

13-
const redisKey = `redis-key-${context.headers.get(CI_RANDOM_ID_HEADER)}`
13+
const randomId = context.headers.get(CI_RANDOM_ID_HEADER)
14+
if (!randomId) {
15+
throw new Error("randomId not found")
16+
}
17+
const redisKey = `redis-key-${randomId}`
1418
const count = await redis.incr(redisKey)
19+
1520
if (count === 1) {
1621
// allow in the first encounter
17-
await context.run("mock step", () => {})
22+
await context.run("mock step", () => { })
1823
} else if (count === 2) {
1924
// return after the step, which should return 400
2025
return
2126
} else if (count === 3) {
2227
// coming back for failureFunction. put a mock step to allow it
23-
await context.run("mock step", () => {})
28+
await context.run("mock step", () => { })
2429
}
2530

2631
// otherwise fail.
2732
await fail(context);
2833
}, {
29-
baseUrl: BASE_URL,
30-
retries: 0,
31-
async failureFunction({ context, failStatus, failResponse }) {
32-
expect(failStatus, 400)
33-
expect(failResponse, `Failed to authenticate Workflow request. If this is unexpected, see the caveat https://upstash.com/docs/workflow/basics/caveats#avoid-non-deterministic-code-outside-context-run`)
34-
await saveResult(
35-
context as WorkflowContext,
36-
secret
37-
)
38-
}
34+
baseUrl: BASE_URL,
35+
retries: 0,
36+
async failureFunction({ context, failStatus, failResponse }) {
37+
expect(failStatus, 400)
38+
expect(failResponse, `Failed to authenticate Workflow request. If this is unexpected, see the caveat https://upstash.com/docs/workflow/basics/caveats#avoid-non-deterministic-code-outside-context-run`)
39+
await saveResult(
40+
context as WorkflowContext,
41+
secret
42+
)
3943
}
40-
), {
41-
expectedCallCount: 3,
42-
expectedResult: secret,
43-
payload: undefined,
4444
}
45+
), {
46+
expectedCallCount: 3,
47+
expectedResult: secret,
48+
payload: undefined,
49+
}
4550
)

0 commit comments

Comments
 (0)