-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathwait-for-webhook.ts
More file actions
144 lines (124 loc) · 4.1 KB
/
Copy pathwait-for-webhook.ts
File metadata and controls
144 lines (124 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/**
* Wait for Webhook — Webhook-driven workflow pauses
*
* Demonstrates waitForWebhookTask for:
* - Pausing a workflow until an external webhook signal arrives
* - Matching webhooks to workflows using correlation
* - Processing webhook payloads after resumption
*
* Run:
* CONDUCTOR_SERVER_URL=http://localhost:8080 npx ts-node examples/advanced/wait-for-webhook.ts
*/
import {
OrkesClients,
ConductorWorkflow,
TaskHandler,
worker,
simpleTask,
waitForWebhookTask,
} from "../../src/sdk";
import type { Task } from "../../src/open-api";
const _processWebhookPayload = worker({ taskDefName: "wh_process_payload", registerTaskDef: true })(
async (task: Task) => {
const payload = task.inputData?.webhookPayload as Record<string, unknown>;
return {
status: "COMPLETED",
outputData: {
processed: true,
paymentId: payload?.paymentId,
status: payload?.status,
amount: payload?.amount,
},
};
}
);
async function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function main() {
const clients = await OrkesClients.from();
const workflowClient = clients.getWorkflowClient();
const taskClient = clients.getTaskClient();
const client = clients.getClient();
// ── Build webhook-driven workflow ─────────────────────────────────
const wf = new ConductorWorkflow(
workflowClient,
"wait_for_webhook_example"
)
.description("Pauses until external webhook signal arrives")
.timeoutSeconds(3600);
// Step 1: Initiate something (e.g., payment)
wf.add(
simpleTask("initiate_ref", "wh_process_payload", {
webhookPayload: { action: "initiate", orderId: "${workflow.input.orderId}" },
})
);
// Step 2: Wait for webhook (e.g., payment confirmation)
wf.add(
waitForWebhookTask("webhook_wait_ref", {
matches: {
"orderId": "${workflow.input.orderId}",
},
})
);
// Step 3: Process the webhook payload
wf.add(
simpleTask("process_ref", "wh_process_payload", {
webhookPayload: "${webhook_wait_ref.output}",
})
);
wf.outputParameters({
orderId: "${workflow.input.orderId}",
initiated: "${initiate_ref.output.processed}",
webhookData: "${webhook_wait_ref.output}",
finalResult: "${process_ref.output}",
});
await wf.register(true);
console.log("Registered workflow:", wf.getName());
const handler = new TaskHandler({ client, scanForDecorated: true });
await handler.startWorkers();
// ── Start workflow — it will pause at webhook wait ────────────────
const workflowId = await wf.startWorkflow({ orderId: "ORD-webhook-123" });
console.log("Started workflow:", workflowId);
console.log("Workflow will pause waiting for webhook...");
// Wait for it to reach the webhook task
await sleep(3000);
const status = await workflowClient.getWorkflow(workflowId, true);
console.log("Current status:", status.status);
// Find the waiting webhook task
const webhookTask = status.tasks?.find(
(t) =>
t.taskReferenceName === "webhook_wait_ref" &&
t.status === "IN_PROGRESS"
);
if (webhookTask?.taskId) {
console.log("\nSimulating webhook callback...");
// Simulate external webhook completing the task
await taskClient.updateTaskResult(
workflowId,
"webhook_wait_ref",
"COMPLETED",
{
paymentId: "PAY-456",
status: "confirmed",
amount: 99.99,
provider: "stripe",
timestamp: new Date().toISOString(),
}
);
console.log("Webhook signal sent. Workflow continuing...");
// Wait for completion
await sleep(3000);
const finalStatus = await workflowClient.getWorkflow(workflowId, true);
console.log("\nFinal status:", finalStatus.status);
console.log("Output:", JSON.stringify(finalStatus.output, null, 2));
} else {
console.log("Webhook task not found in expected state.");
}
await handler.stopWorkers();
process.exit(0);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});