Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ jobs:

- uses: pnpm/action-setup@v2
with:
version: latest
version: 10

- name: Install Dependencies
run: pnpm install
Expand Down Expand Up @@ -294,7 +294,7 @@ jobs:

- uses: pnpm/action-setup@v2
with:
version: latest
version: 10

- name: Install Dependencies
run: pnpm install
Expand Down Expand Up @@ -683,7 +683,7 @@ jobs:

- uses: pnpm/action-setup@v2
with:
version: latest
version: 10

- name: Deploy
run: |
Expand Down Expand Up @@ -714,7 +714,7 @@ jobs:

- uses: pnpm/action-setup@v2
with:
version: latest
version: 10

- name: Install example
run: |
Expand Down Expand Up @@ -768,7 +768,7 @@ jobs:

- uses: pnpm/action-setup@v2
with:
version: latest
version: 10

- name: Deploy
run: |
Expand Down Expand Up @@ -799,7 +799,7 @@ jobs:

- uses: pnpm/action-setup@v2
with:
version: latest
version: 10

- name: Deploy
run: |
Expand Down Expand Up @@ -830,7 +830,7 @@ jobs:

- uses: pnpm/action-setup@v2
with:
version: latest
version: 10

- name: Deploy
run: |
Expand Down Expand Up @@ -861,7 +861,7 @@ jobs:

- uses: pnpm/action-setup@v2
with:
version: latest
version: 10

- name: Deploy
run: |
Expand Down
3 changes: 3 additions & 0 deletions examples/ci/app/ci/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ export const TEST_ROUTES: Pick<TestConfig, RouteConfigs>[] = [
},
{
route: "invoke-parent-failure/workflow",
},
{
route: "retried",
}

/**
Expand Down
53 changes: 53 additions & 0 deletions examples/ci/app/test-routes/retried/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { serve } from "@upstash/workflow/nextjs";
import { BASE_URL } from "app/ci/constants";
import { testServe, expect } from "app/ci/utils";
import { saveResult } from "app/ci/upstash/redis"

const header = `test-header-foo`
const headerValue = `header-bar`
const payload = "retried-payload"

export const { POST, GET } = testServe(
serve<string>(
async (context) => {
const input = context.requestPayload;

expect(input, payload);
expect(context.headers.get(header)!, headerValue)

// context.retried reflects the QStash `Upstash-Retried` header for the
// current delivery. It must be a number.
expect(typeof context.retried, "number")

// On the first delivery of this step QStash sets Upstash-Retried: 0,
// we throw to force a retry. On the retry QStash sets it to 1 and we
// assert the new value and complete the step.
const observedRetried = await context.run("force-retry", () => {
if (context.retried === 0) {
throw new Error("forcing-retry")
}
return context.retried
})

expect(observedRetried, 1)

await saveResult(
context,
`retried=${observedRetried}`
)
}, {
baseUrl: BASE_URL,
}
), {
// first delivery + retried delivery + post-step continuation
expectedCallCount: 3,
expectedResult: "retried=1",
payload,
headers: {
[ header ]: headerValue
},
triggerConfig: {
retries: 1,
}
}
)
10 changes: 7 additions & 3 deletions examples/express/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ const ciRouter = serve(

app.post("/ci", ciRouter);

app.listen(3001, () => {
console.log('Server running on port 3001');
});
if (require.main === module) {
app.listen(3001, () => {
console.log('Server running on port 3001');
});
}

export default app;
23 changes: 16 additions & 7 deletions examples/express/ci.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { describe, test, expect } from "vitest"
import { RedisEntry } from "./api"
import "dotenv/config"

export const RETRY_COUNT = 10
export const RETRY_COUNT = 20
export const RETRY_INTERVAL_DURATION = 1000
export const CHECK_WF_AFTER_INIT_DURATION = 10000
const TEST_BUFFER_DURATION = 5000
Expand Down Expand Up @@ -157,13 +157,22 @@ const testEndpoint = ({
expect(result?.result).toBe(expectedResult)

if (expectedLog) {
const logs = await client.logs({
workflowRunId
})

let logs: Awaited<ReturnType<typeof client.logs>> | undefined
for (let i=1; i<=RETRY_COUNT; i++) {
logs = await client.logs({
workflowRunId
})
if (logs.runs.length === 1) {
break
}
if (i!==RETRY_COUNT) {
await new Promise(r => setTimeout(r, RETRY_INTERVAL_DURATION));
}
}

expect(logs).toBeDefined()
expect(logs.runs.length).toBe(1)
expect(logs.runs[0]).toEqual(expectedLog)
expect(logs!.runs.length).toBe(1)
expect(logs!.runs[0]).toEqual(expectedLog)
}
}, TEST_TIMEOUT_DURATION)
}
Expand Down
23 changes: 16 additions & 7 deletions examples/nextjs-pages/ci.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Redis } from "@upstash/redis"
import { serve } from "@upstash/workflow/nextjs"
import { describe, test, expect } from "vitest"

export const RETRY_COUNT = 10
export const RETRY_COUNT = 20
export const RETRY_INTERVAL_DURATION = 1000
export const CHECK_WF_AFTER_INIT_DURATION = 10000
const TEST_BUFFER_DURATION = 5000
Expand Down Expand Up @@ -169,13 +169,22 @@ const testEndpoint = ({
expect(result?.result).toBe(expectedResult)

if (expectedLog) {
const logs = await client.logs({
workflowRunId
})

let logs: Awaited<ReturnType<typeof client.logs>> | undefined
for (let i=1; i<=RETRY_COUNT; i++) {
logs = await client.logs({
workflowRunId
})
if (logs.runs.length === 1) {
break
}
if (i!==RETRY_COUNT) {
await new Promise(r => setTimeout(r, RETRY_INTERVAL_DURATION));
}
}

expect(logs).toBeDefined()
expect(logs.runs.length).toBe(1)
expect(logs.runs[0]).toEqual(expectedLog)
expect(logs!.runs.length).toBe(1)
expect(logs!.runs[0]).toEqual(expectedLog)
}
}, TEST_TIMEOUT_DURATION)
}
Expand Down
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export const WORKFLOW_FAILURE_HEADER = "Upstash-Workflow-Is-Failure";
export const WORKFLOW_FAILURE_CALLBACK_HEADER = "Upstash-Workflow-Failure-Callback";
export const WORKFLOW_FEATURE_HEADER = "Upstash-Feature-Set";
export const WORKFLOW_INVOKE_COUNT_HEADER = "Upstash-Workflow-Invoke-Count";
export const WORKFLOW_RETRIED_HEADER = "Upstash-Retried";
export const WORKFLOW_LABEL_HEADER = "Upstash-Label";
export const WORKFLOW_UNKOWN_SDK_VERSION_HEADER = "Upstash-Workflow-Unknown-Sdk";
export const WORKFLOW_UNKOWN_SDK_TRIGGER_HEADER = "upstash-workflow-trigger-by-sdk";
Expand Down
11 changes: 11 additions & 0 deletions src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ export class WorkflowContext<TInitialPayload = unknown> {
*/
public readonly label?: string;

/**
* Number of times QStash has retried delivering the current request.
*
* Sourced from the `Upstash-Retried` header. `0` on the first delivery,
* `1` on the first retry, `2` on the second, and so on.
*/
public readonly retried: number;

constructor({
qstashClient,
workflowRunId,
Expand All @@ -167,6 +175,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
telemetry,
invokeCount,
label,
retried,
middlewareManager,
}: {
qstashClient: WorkflowClient;
Expand All @@ -180,6 +189,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
telemetry?: Telemetry;
invokeCount?: number;
label?: string;
retried?: number;
middlewareManager?: MiddlewareManager<TInitialPayload>;
}) {
this.qstashClient = qstashClient;
Expand All @@ -191,6 +201,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
this.requestPayload = initialPayload;
this.env = env ?? {};
this.label = label;
this.retried = retried ?? 0;

const middlewareManagerInstance =
middlewareManager ?? new MiddlewareManager<TInitialPayload, unknown>([]);
Expand Down
1 change: 1 addition & 0 deletions src/serve/authorization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export class DisabledWorkflowContext<
initialPayload: context.requestPayload,
env: context.env,
label: context.label,
retried: context.retried,
});

try {
Expand Down
3 changes: 3 additions & 0 deletions src/serve/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
WORKFLOW_LABEL_HEADER,
WORKFLOW_PROTOCOL_VERSION,
WORKFLOW_PROTOCOL_VERSION_HEADER,
WORKFLOW_RETRIED_HEADER,
} from "../constants";
import { WorkflowContext } from "../context";
import {
Expand Down Expand Up @@ -186,6 +187,7 @@ export const serveBase = <
}

const invokeCount = Number(request.headers.get(WORKFLOW_INVOKE_COUNT_HEADER) ?? "0");
const retried = Number(request.headers.get(WORKFLOW_RETRIED_HEADER) ?? "0");
const label = request.headers.get(WORKFLOW_LABEL_HEADER) ?? undefined;
const workflowRunCreatedAt = request.headers.get(WORKFLOW_CREATED_AT_HEADER)!;

Expand All @@ -203,6 +205,7 @@ export const serveBase = <
telemetry,
invokeCount,
label,
retried,
workflowRunCreatedAt: Number(workflowRunCreatedAt),
middlewareManager,
});
Expand Down
3 changes: 3 additions & 0 deletions src/workflow-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
WORKFLOW_LABEL_HEADER,
WORKFLOW_PROTOCOL_VERSION,
WORKFLOW_PROTOCOL_VERSION_HEADER,
WORKFLOW_RETRIED_HEADER,
WORKFLOW_UNKOWN_SDK_VERSION_HEADER,
} from "./constants";
import type {
Expand Down Expand Up @@ -414,6 +415,7 @@ export const handleFailure = async <TInitialPayload>({
}

const userHeaders = recreateUserHeaders(request.headers as Headers);
const retried = Number(request.headers.get(WORKFLOW_RETRIED_HEADER) ?? "0");

// create context
const workflowContext = new WorkflowContext<TInitialPayload>({
Expand All @@ -428,6 +430,7 @@ export const handleFailure = async <TInitialPayload>({
env,
telemetry: undefined, // not going to make requests in authentication check
label: userHeaders.get(WORKFLOW_LABEL_HEADER) ?? undefined,
retried,
workflowRunCreatedAt: workflowCreatedAt,
middlewareManager: undefined,
});
Expand Down
Loading