diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c1605085..48557ef5 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -97,7 +97,7 @@ jobs: - uses: pnpm/action-setup@v2 with: - version: latest + version: 10 - name: Install Dependencies run: pnpm install @@ -294,7 +294,7 @@ jobs: - uses: pnpm/action-setup@v2 with: - version: latest + version: 10 - name: Install Dependencies run: pnpm install @@ -683,7 +683,7 @@ jobs: - uses: pnpm/action-setup@v2 with: - version: latest + version: 10 - name: Deploy run: | @@ -714,7 +714,7 @@ jobs: - uses: pnpm/action-setup@v2 with: - version: latest + version: 10 - name: Install example run: | @@ -768,7 +768,7 @@ jobs: - uses: pnpm/action-setup@v2 with: - version: latest + version: 10 - name: Deploy run: | @@ -799,7 +799,7 @@ jobs: - uses: pnpm/action-setup@v2 with: - version: latest + version: 10 - name: Deploy run: | @@ -830,7 +830,7 @@ jobs: - uses: pnpm/action-setup@v2 with: - version: latest + version: 10 - name: Deploy run: | @@ -861,7 +861,7 @@ jobs: - uses: pnpm/action-setup@v2 with: - version: latest + version: 10 - name: Deploy run: | diff --git a/examples/ci/app/ci/constants.ts b/examples/ci/app/ci/constants.ts index eba6db28..1db63968 100644 --- a/examples/ci/app/ci/constants.ts +++ b/examples/ci/app/ci/constants.ts @@ -119,6 +119,9 @@ export const TEST_ROUTES: Pick[] = [ }, { route: "invoke-parent-failure/workflow", + }, + { + route: "retried", } /** diff --git a/examples/ci/app/test-routes/retried/route.ts b/examples/ci/app/test-routes/retried/route.ts new file mode 100644 index 00000000..a5a87797 --- /dev/null +++ b/examples/ci/app/test-routes/retried/route.ts @@ -0,0 +1,53 @@ +import { serve } from "@upstash/workflow/nextjs"; +import { BASE_URL } from "app/ci/constants"; +import { testServe, expect } from "app/ci/utils"; +import { saveResult } from "app/ci/upstash/redis" + +const header = `test-header-foo` +const headerValue = `header-bar` +const payload = "retried-payload" + +export const { POST, GET } = testServe( + serve( + async (context) => { + const input = context.requestPayload; + + expect(input, payload); + expect(context.headers.get(header)!, headerValue) + + // context.retried reflects the QStash `Upstash-Retried` header for the + // current delivery. It must be a number. + expect(typeof context.retried, "number") + + // On the first delivery of this step QStash sets Upstash-Retried: 0, + // we throw to force a retry. On the retry QStash sets it to 1 and we + // assert the new value and complete the step. + const observedRetried = await context.run("force-retry", () => { + if (context.retried === 0) { + throw new Error("forcing-retry") + } + return context.retried + }) + + expect(observedRetried, 1) + + await saveResult( + context, + `retried=${observedRetried}` + ) + }, { + baseUrl: BASE_URL, + } + ), { + // first delivery + retried delivery + post-step continuation + expectedCallCount: 3, + expectedResult: "retried=1", + payload, + headers: { + [ header ]: headerValue + }, + triggerConfig: { + retries: 1, + } + } +) diff --git a/examples/express/api/index.ts b/examples/express/api/index.ts index 9fd1a5f8..405e542f 100644 --- a/examples/express/api/index.ts +++ b/examples/express/api/index.ts @@ -169,6 +169,10 @@ const ciRouter = serve( app.post("/ci", ciRouter); -app.listen(3001, () => { - console.log('Server running on port 3001'); -}); \ No newline at end of file +if (require.main === module) { + app.listen(3001, () => { + console.log('Server running on port 3001'); + }); +} + +export default app; \ No newline at end of file diff --git a/examples/express/ci.test.ts b/examples/express/ci.test.ts index 776f4a01..cd66e1da 100644 --- a/examples/express/ci.test.ts +++ b/examples/express/ci.test.ts @@ -5,7 +5,7 @@ import { describe, test, expect } from "vitest" import { RedisEntry } from "./api" import "dotenv/config" -export const RETRY_COUNT = 10 +export const RETRY_COUNT = 20 export const RETRY_INTERVAL_DURATION = 1000 export const CHECK_WF_AFTER_INIT_DURATION = 10000 const TEST_BUFFER_DURATION = 5000 @@ -157,13 +157,22 @@ const testEndpoint = ({ expect(result?.result).toBe(expectedResult) if (expectedLog) { - const logs = await client.logs({ - workflowRunId - }) - + let logs: Awaited> | undefined + for (let i=1; i<=RETRY_COUNT; i++) { + logs = await client.logs({ + workflowRunId + }) + if (logs.runs.length === 1) { + break + } + if (i!==RETRY_COUNT) { + await new Promise(r => setTimeout(r, RETRY_INTERVAL_DURATION)); + } + } + expect(logs).toBeDefined() - expect(logs.runs.length).toBe(1) - expect(logs.runs[0]).toEqual(expectedLog) + expect(logs!.runs.length).toBe(1) + expect(logs!.runs[0]).toEqual(expectedLog) } }, TEST_TIMEOUT_DURATION) } diff --git a/examples/nextjs-pages/ci.test.ts b/examples/nextjs-pages/ci.test.ts index 705b2d61..f16861a8 100644 --- a/examples/nextjs-pages/ci.test.ts +++ b/examples/nextjs-pages/ci.test.ts @@ -6,7 +6,7 @@ import { Redis } from "@upstash/redis" import { serve } from "@upstash/workflow/nextjs" import { describe, test, expect } from "vitest" -export const RETRY_COUNT = 10 +export const RETRY_COUNT = 20 export const RETRY_INTERVAL_DURATION = 1000 export const CHECK_WF_AFTER_INIT_DURATION = 10000 const TEST_BUFFER_DURATION = 5000 @@ -169,13 +169,22 @@ const testEndpoint = ({ expect(result?.result).toBe(expectedResult) if (expectedLog) { - const logs = await client.logs({ - workflowRunId - }) - + let logs: Awaited> | undefined + for (let i=1; i<=RETRY_COUNT; i++) { + logs = await client.logs({ + workflowRunId + }) + if (logs.runs.length === 1) { + break + } + if (i!==RETRY_COUNT) { + await new Promise(r => setTimeout(r, RETRY_INTERVAL_DURATION)); + } + } + expect(logs).toBeDefined() - expect(logs.runs.length).toBe(1) - expect(logs.runs[0]).toEqual(expectedLog) + expect(logs!.runs.length).toBe(1) + expect(logs!.runs[0]).toEqual(expectedLog) } }, TEST_TIMEOUT_DURATION) } diff --git a/src/constants.ts b/src/constants.ts index cfbcdc9b..cbd07cb5 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -8,6 +8,7 @@ export const WORKFLOW_FAILURE_HEADER = "Upstash-Workflow-Is-Failure"; export const WORKFLOW_FAILURE_CALLBACK_HEADER = "Upstash-Workflow-Failure-Callback"; export const WORKFLOW_FEATURE_HEADER = "Upstash-Feature-Set"; export const WORKFLOW_INVOKE_COUNT_HEADER = "Upstash-Workflow-Invoke-Count"; +export const WORKFLOW_RETRIED_HEADER = "Upstash-Retried"; export const WORKFLOW_LABEL_HEADER = "Upstash-Label"; export const WORKFLOW_UNKOWN_SDK_VERSION_HEADER = "Upstash-Workflow-Unknown-Sdk"; export const WORKFLOW_UNKOWN_SDK_TRIGGER_HEADER = "upstash-workflow-trigger-by-sdk"; diff --git a/src/context/context.ts b/src/context/context.ts index 1fa4a896..34ffbe70 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -155,6 +155,14 @@ export class WorkflowContext { */ public readonly label?: string; + /** + * Number of times QStash has retried delivering the current request. + * + * Sourced from the `Upstash-Retried` header. `0` on the first delivery, + * `1` on the first retry, `2` on the second, and so on. + */ + public readonly retried: number; + constructor({ qstashClient, workflowRunId, @@ -167,6 +175,7 @@ export class WorkflowContext { telemetry, invokeCount, label, + retried, middlewareManager, }: { qstashClient: WorkflowClient; @@ -180,6 +189,7 @@ export class WorkflowContext { telemetry?: Telemetry; invokeCount?: number; label?: string; + retried?: number; middlewareManager?: MiddlewareManager; }) { this.qstashClient = qstashClient; @@ -191,6 +201,7 @@ export class WorkflowContext { this.requestPayload = initialPayload; this.env = env ?? {}; this.label = label; + this.retried = retried ?? 0; const middlewareManagerInstance = middlewareManager ?? new MiddlewareManager([]); diff --git a/src/serve/authorization.ts b/src/serve/authorization.ts index c3affb8d..5e6b6edb 100644 --- a/src/serve/authorization.ts +++ b/src/serve/authorization.ts @@ -90,6 +90,7 @@ export class DisabledWorkflowContext< initialPayload: context.requestPayload, env: context.env, label: context.label, + retried: context.retried, }); try { diff --git a/src/serve/index.ts b/src/serve/index.ts index 88ecf356..60a0a68b 100644 --- a/src/serve/index.ts +++ b/src/serve/index.ts @@ -6,6 +6,7 @@ import { WORKFLOW_LABEL_HEADER, WORKFLOW_PROTOCOL_VERSION, WORKFLOW_PROTOCOL_VERSION_HEADER, + WORKFLOW_RETRIED_HEADER, } from "../constants"; import { WorkflowContext } from "../context"; import { @@ -186,6 +187,7 @@ export const serveBase = < } const invokeCount = Number(request.headers.get(WORKFLOW_INVOKE_COUNT_HEADER) ?? "0"); + const retried = Number(request.headers.get(WORKFLOW_RETRIED_HEADER) ?? "0"); const label = request.headers.get(WORKFLOW_LABEL_HEADER) ?? undefined; const workflowRunCreatedAt = request.headers.get(WORKFLOW_CREATED_AT_HEADER)!; @@ -203,6 +205,7 @@ export const serveBase = < telemetry, invokeCount, label, + retried, workflowRunCreatedAt: Number(workflowRunCreatedAt), middlewareManager, }); diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts index ac9982ee..8c3faf6f 100644 --- a/src/workflow-parser.ts +++ b/src/workflow-parser.ts @@ -9,6 +9,7 @@ import { WORKFLOW_LABEL_HEADER, WORKFLOW_PROTOCOL_VERSION, WORKFLOW_PROTOCOL_VERSION_HEADER, + WORKFLOW_RETRIED_HEADER, WORKFLOW_UNKOWN_SDK_VERSION_HEADER, } from "./constants"; import type { @@ -414,6 +415,7 @@ export const handleFailure = async ({ } const userHeaders = recreateUserHeaders(request.headers as Headers); + const retried = Number(request.headers.get(WORKFLOW_RETRIED_HEADER) ?? "0"); // create context const workflowContext = new WorkflowContext({ @@ -428,6 +430,7 @@ export const handleFailure = async ({ env, telemetry: undefined, // not going to make requests in authentication check label: userHeaders.get(WORKFLOW_LABEL_HEADER) ?? undefined, + retried, workflowRunCreatedAt: workflowCreatedAt, middlewareManager: undefined, });