Skip to content
65 changes: 65 additions & 0 deletions src/client/dlq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,35 @@ describe("DLQ", () => {
});
});

test("should surface both label and labels from the response", async () => {
const labelOne = "label-one";
const labelTwo = "label-two";
const message = {
...MOCK_DLQ_MESSAGES[0],
label: labelOne,
labels: [labelOne, labelTwo],
};

await mockQStashServer({
execute: async () => {
const result = await client.dlq.list();
// legacy `label` only carries the first label
expect(result.messages[0].label).toBe(labelOne);
// new `labels` carries all of them
expect(result.messages[0].labels).toEqual([labelOne, labelTwo]);
},
responseFields: {
status: 200,
body: { messages: [message], cursor: undefined },
},
receivesRequest: {
method: "GET",
url: `${MOCK_QSTASH_SERVER_URL}/v2/dlq?source=workflow`,
token,
},
});
});

test("should list DLQ messages with filter options", async () => {
const filter = {
fromDate: 1640995200000, // 2022-01-01
Expand Down Expand Up @@ -1067,6 +1096,42 @@ describe("DLQ", () => {
{ timeout: 180000 }
);

test(
"should return both label and labels on a DLQ message",
async () => {
const labelOne = `dlq-labels-1-${nanoid()}`;
const labelTwo = `dlq-labels-2-${nanoid()}`;

const { workflowRunId } = await liveClient.trigger({
url: "https://mock.httpstatus.io/500",
label: [labelOne, labelTwo],
retries: 0,
});

try {
await eventually(
async () => {
const { messages } = await liveClient.dlq.list({
count: 100,
filter: { label: [labelOne, labelTwo] },
});
const message = messages.find((m) => m.workflowRunId === workflowRunId);
expect(message).toBeDefined();
// legacy `label` only carries the first label
expect(message!.label).toBe(labelOne);
// new `labels` carries all of them
expect(message!.labels).toEqual([labelOne, labelTwo]);
},
{ timeout: 120000, interval: 2000 }
);
} finally {
// clean up so we don't leave noise in the DLQ
await liveClient.dlq.delete({ filter: { label: [labelOne, labelTwo] } }).catch(() => {});
}
},
{ timeout: 180000 }
);

test(
"should retry failure function of a DLQ message",
async () => {
Expand Down
12 changes: 11 additions & 1 deletion src/client/dlq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,18 @@ type DLQMessage = {
*/
failureCallbackInfo?: FailureCallbackInfo;
/**
* label passed when triggering workflow
* Label passed when triggering the workflow run.
*
* @deprecated Use `labels` instead. When a run has multiple labels, this
* field only contains the first one.
*/
label?: string;
/**
* Labels attached to the workflow run.
*
* A run can have multiple labels when triggered with `label: string[]`.
*/
labels?: string[];
};

type PublicDLQMessage = Pick<
Expand All @@ -68,6 +77,7 @@ type PublicDLQMessage = Pick<
| "failureCallback"
| "failureCallbackInfo"
| "label"
| "labels"
>;

function buildResumeRestartHeaders(options?: ResumeRestartOptions): Record<string, string> {
Expand Down
35 changes: 18 additions & 17 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ describe("workflow client", () => {
"upstash-retry-delay": "1000",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-workflow-url": "https://requestcatcher.com/api",
"upstash-workflow-url": "https://wf-test.requestcatcher.com/api",
"upstash-delay": "1s",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger,WF_TriggerOnConfig",
Expand All @@ -696,7 +696,7 @@ describe("workflow client", () => {
"upstash-telemetry-sdk": expect.stringContaining("@upstash/workflow"),
"upstash-workflow-sdk-version": "1",
"upstash-failure-callback-forward-upstash-label": "test-label",
"upstash-failure-callback": "https://requestcatcher.com/api",
"upstash-failure-callback": "https://wf-test.requestcatcher.com/api",
"upstash-failure-callback-feature-set":
"LazyFetch,InitialBody,WF_DetectTrigger,WF_TriggerOnConfig",
"upstash-failure-callback-forward-upstash-workflow-failure-callback": "true",
Expand All @@ -707,7 +707,7 @@ describe("workflow client", () => {
"upstash-failure-callback-workflow-calltype": "failureCall",
"upstash-failure-callback-workflow-init": "false",
"upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-failure-callback-workflow-url": "https://requestcatcher.com/api",
"upstash-failure-callback-workflow-url": "https://wf-test.requestcatcher.com/api",
},
body,
},
Expand Down Expand Up @@ -767,15 +767,15 @@ describe("workflow client", () => {
"upstash-retry-delay": "1000",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-workflow-url": "https://requestcatcher.com/api",
"upstash-workflow-url": "https://wf-test.requestcatcher.com/api",
"upstash-delay": "1s",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger,WF_TriggerOnConfig",
"upstash-telemetry-framework": "unknown",
"upstash-telemetry-runtime": expect.stringMatching(/bun@/),
"upstash-telemetry-sdk": expect.stringContaining("@upstash/workflow"),
"upstash-workflow-sdk-version": "1",
"upstash-failure-callback": "https://requestcatcher.com/api",
"upstash-failure-callback": "https://wf-test.requestcatcher.com/api",
"upstash-failure-callback-feature-set":
"LazyFetch,InitialBody,WF_DetectTrigger,WF_TriggerOnConfig",
"upstash-failure-callback-forward-upstash-workflow-failure-callback": "true",
Expand All @@ -786,7 +786,7 @@ describe("workflow client", () => {
"upstash-failure-callback-workflow-calltype": "failureCall",
"upstash-failure-callback-workflow-init": "false",
"upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-failure-callback-workflow-url": "https://requestcatcher.com/api",
"upstash-failure-callback-workflow-url": "https://wf-test.requestcatcher.com/api",
},
body,
},
Expand All @@ -800,15 +800,15 @@ describe("workflow client", () => {
"upstash-retry-delay": "2000",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId2}`,
"upstash-workflow-url": "https://requestcatcher.com/api",
"upstash-workflow-url": "https://wf-test.requestcatcher.com/api",
"upstash-not-before": "4102444800",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger,WF_TriggerOnConfig",
"upstash-telemetry-framework": "unknown",
"upstash-telemetry-runtime": expect.stringMatching(/bun@/),
"upstash-telemetry-sdk": expect.stringContaining("@upstash/workflow"),
"upstash-workflow-sdk-version": "1",
"upstash-failure-callback": "https://requestcatcher.com/api",
"upstash-failure-callback": "https://wf-test.requestcatcher.com/api",
"upstash-failure-callback-feature-set":
"LazyFetch,InitialBody,WF_DetectTrigger,WF_TriggerOnConfig",
"upstash-failure-callback-forward-upstash-workflow-failure-callback": "true",
Expand All @@ -819,7 +819,7 @@ describe("workflow client", () => {
"upstash-failure-callback-workflow-calltype": "failureCall",
"upstash-failure-callback-workflow-init": "false",
"upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId2}`,
"upstash-failure-callback-workflow-url": "https://requestcatcher.com/api",
"upstash-failure-callback-workflow-url": "https://wf-test.requestcatcher.com/api",
},
body: body2,
},
Expand All @@ -841,7 +841,7 @@ describe("workflow client", () => {
retries: 15,
retryDelay: "1000",
delay: 1,
failureUrl: "https://requestcatcher.com/some-failure-callback",
failureUrl: "https://wf-test.requestcatcher.com/some-failure-callback",
flowControl: {
key: "failure-flow-key",
rate: 5,
Expand Down Expand Up @@ -869,9 +869,10 @@ describe("workflow client", () => {
"upstash-retry-delay": "1000",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-workflow-url": "https://requestcatcher.com/api",
"upstash-workflow-url": "https://wf-test.requestcatcher.com/api",
"upstash-delay": "1s",
"upstash-failure-callback": "https://requestcatcher.com/some-failure-callback",
"upstash-failure-callback":
"https://wf-test.requestcatcher.com/some-failure-callback",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger,WF_TriggerOnConfig",
"upstash-failure-callback-feature-set":
Expand All @@ -884,7 +885,7 @@ describe("workflow client", () => {
"upstash-failure-callback-workflow-calltype": "failureCall",
"upstash-failure-callback-workflow-init": "false",
"upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-failure-callback-workflow-url": "https://requestcatcher.com/api",
"upstash-failure-callback-workflow-url": "https://wf-test.requestcatcher.com/api",
"upstash-telemetry-framework": "unknown",
"upstash-telemetry-runtime": expect.stringMatching(/bun@/),
"upstash-telemetry-sdk": expect.stringContaining("@upstash/workflow"),
Expand Down Expand Up @@ -944,7 +945,7 @@ describe("workflow client", () => {
body,
workflowRunId: myWorkflowRunId,
redact: { body: true, header: true },
failureUrl: "https://requestcatcher.com/some-failure-callback",
failureUrl: "https://wf-test.requestcatcher.com/some-failure-callback",
});
},
responseFields: {
Expand Down Expand Up @@ -1484,7 +1485,7 @@ describe("workflow client", () => {
"upstash-method": "POST",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-workflow-url": "https://requestcatcher.com/api",
"upstash-workflow-url": "https://wf-test.requestcatcher.com/api",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger,WF_TriggerOnConfig",
"upstash-forward-upstash-label": "valid_label.1",
Expand All @@ -1496,7 +1497,7 @@ describe("workflow client", () => {
"upstash-telemetry-sdk": expect.stringContaining("@upstash/workflow"),
"upstash-workflow-sdk-version": "1",
"upstash-failure-callback-forward-upstash-label": "valid_label.1",
"upstash-failure-callback": "https://requestcatcher.com/api",
"upstash-failure-callback": "https://wf-test.requestcatcher.com/api",
"upstash-failure-callback-feature-set":
"LazyFetch,InitialBody,WF_DetectTrigger,WF_TriggerOnConfig",
"upstash-failure-callback-flow-control-key": "valid-key_1.0",
Expand All @@ -1506,7 +1507,7 @@ describe("workflow client", () => {
"upstash-failure-callback-workflow-calltype": "failureCall",
"upstash-failure-callback-workflow-init": "false",
"upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-failure-callback-workflow-url": "https://requestcatcher.com/api",
"upstash-failure-callback-workflow-url": "https://wf-test.requestcatcher.com/api",
},
},
],
Expand Down
7 changes: 7 additions & 0 deletions src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ type StepLogGroup =
*/
steps: {
messageId: string;
/**
* name of the step being run.
*
* only available when the SDK reported it while the step was failing
* (via the `Upstash-Error-Step-Name` response header).
*/
stepName?: string;
state: "STEP_PROGRESS" | "STEP_RETRY" | "STEP_FAILED" | "STEP_CANCELED";
/**
* retries
Expand Down
3 changes: 2 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const WORKFLOW_RETRIED_HEADER = "Upstash-Retried";
export const WORKFLOW_LABEL_HEADER = "Upstash-Label";
export const WORKFLOW_UNKOWN_SDK_VERSION_HEADER = "Upstash-Workflow-Unknown-Sdk";
export const WORKFLOW_UNKOWN_SDK_TRIGGER_HEADER = "upstash-workflow-trigger-by-sdk";
export const WORKFLOW_ERROR_STEP_NAME_HEADER = "Upstash-Error-Step-Name";

export const WORKFLOW_PROTOCOL_VERSION = "1";
export const WORKFLOW_PROTOCOL_VERSION_HEADER = "Upstash-Workflow-Sdk-Version";
Expand All @@ -22,7 +23,7 @@ export const NO_CONCURRENCY = 1;
export const NOT_SET = "not-set";
export const DEFAULT_RETRIES = 3;

export const VERSION = "v1.2.1";
export const VERSION = "v1.3.2";
export const SDK_TELEMETRY = `@upstash/workflow@${VERSION}`;

export const TELEMETRY_HEADER_SDK = "Upstash-Telemetry-Sdk" as const;
Expand Down
8 changes: 6 additions & 2 deletions src/context/auto-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { isInstanceOf, WorkflowAbort, WorkflowError } from "../error";
import { attachStepNameToError, isInstanceOf, WorkflowAbort, WorkflowError } from "../error";
import type { WorkflowContext } from "./context";
import type { StepFunction, ParallelCallState, Step, Telemetry } from "../types";
import { type BaseLazyStep } from "./steps";
Expand Down Expand Up @@ -265,9 +265,13 @@ export class AutoExecutor {
) {
throw error;
}
throw new WorkflowError(
const wrappedError = new WorkflowError(
`Error submitting steps to QStash in partial parallel step execution: ${error}`
);
// preserve the failing step name through the re-wrap so it still
// reaches the error response header
attachStepNameToError(wrappedError, parallelSteps[stepIndex].stepName);
throw wrappedError;
}
break;
}
Expand Down
42 changes: 42 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,48 @@ export const formatWorkflowError = (error: unknown): FailureFunctionPayload => {
};
};

/**
* An error that may carry the name of the step that was executing when it was
* thrown. The serve handler reads this to report the failing step name to QStash
* via the `Upstash-Error-Step-Name` response header (so it shows up in Workflow Logs).
*/
type ErrorWithStepName = {
errorStepName?: string;
};

/**
* Records the name of the currently executing step on an error so it can be
* surfaced later when building the error response. The first recorded name wins,
* so the step closest to where the error originated is preserved.
*
* @param error error thrown while executing a step
* @param stepName name of the step that was executing
*/
export const attachStepNameToError = (error: unknown, stepName: string): void => {
if (typeof error !== "object" || error === null) return;
if ("errorStepName" in error) return;
if (!Object.isExtensible(error)) return;
try {
(error as ErrorWithStepName).errorStepName = stepName;
} catch {
// best-effort metadata only; never mask the original error
}
};

/**
* Reads the step name recorded by `attachStepNameToError`.
*
* @param error error to read the step name from
* @returns the recorded step name, or undefined if none was recorded
*/
export const getStepNameFromError = (error: unknown): string | undefined => {
if (typeof error === "object" && error !== null) {
const stepName = (error as ErrorWithStepName).errorStepName;
return typeof stepName === "string" ? stepName : undefined;
}
return undefined;
};

/**
* Gets the constructor name of an object.
*
Expand Down
14 changes: 11 additions & 3 deletions src/middleware/middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,10 @@ describe("middleware", () => {
await context.sleep(stepThreeName, 10);
};

const qstashClient = new Client({ baseUrl: "https://requestcatcher.com", token: "token" });
const qstashClient = new Client({
baseUrl: "https://wf-test.requestcatcher.com",
token: "token",
});
qstashClient.http.request = jest.fn();

const runMiddlewareTest = async (
Expand All @@ -257,11 +260,16 @@ describe("middleware", () => {
) => {
const { middleware, accumulator } = createLoggingMiddleware();

const request = getRequest("https://requestcatcher.com", "wfr-id", undefined, steps);
const request = getRequest(
"https://wf-test.requestcatcher.com",
"wfr-id",
undefined,
steps
);

const { POST: handler } = serve(routeFunction, {
middlewares: [middleware],
url: "https://requestcatcher.com",
url: "https://wf-test.requestcatcher.com",
receiver: undefined,
qstashClient,
});
Expand Down
Loading
Loading