Skip to content

Commit feac8b3

Browse files
nthmost-orkesclaude
andcommitted
feat: add WAIT_FOR_WEBHOOK task type foundation (epic #888, PR 1)
Introduces the module structure and core lifecycle classes for WAIT_FOR_WEBHOOK — a system task that pauses a workflow until a matching inbound webhook event arrives. This PR establishes the foundation: - TaskType.WAIT_FOR_WEBHOOK enum value + string constant - WaitForWebhookTaskMapper (core): resolves input, creates task IN_PROGRESS - webhook-task module: WaitForWebhookTask system task, WebhookTaskDAO interface, InMemoryWebhookTaskDAO default implementation - Build wiring: settings.gradle, server/build.gradle - 24 unit tests covering mapper, task lifecycle, and DAO operations - Example workflow: order_payment_workflow.json WebhookTaskDAO and WebhookVerifier are defined as Spring-pluggable interfaces so Orkes Enterprise can substitute durable backends (Postgres, Redis) without modifying OSS code. Subsequent PRs add: config CRUD (#889), hash computation (#890), inbound REST endpoint (#892), workflowsToStart (#891), docs (#893). Closes part of #888. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 23ef52e commit feac8b3

13 files changed

Lines changed: 877 additions & 0 deletions

File tree

common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public enum TaskType {
3232
EVENT,
3333
WAIT,
3434
HUMAN,
35+
WAIT_FOR_WEBHOOK,
3536
USER_DEFINED,
3637
HTTP,
3738
LAMBDA,
@@ -57,6 +58,7 @@ public enum TaskType {
5758
public static final String TASK_TYPE_EVENT = "EVENT";
5859
public static final String TASK_TYPE_WAIT = "WAIT";
5960
public static final String TASK_TYPE_HUMAN = "HUMAN";
61+
public static final String TASK_TYPE_WAIT_FOR_WEBHOOK = "WAIT_FOR_WEBHOOK";
6062
public static final String TASK_TYPE_SUB_WORKFLOW = "SUB_WORKFLOW";
6163
public static final String TASK_TYPE_START_WORKFLOW = "START_WORKFLOW";
6264
public static final String TASK_TYPE_FORK_JOIN = "FORK_JOIN";
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2024 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.core.execution.mapper;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
import org.springframework.stereotype.Component;
21+
22+
import com.netflix.conductor.common.metadata.tasks.TaskType;
23+
import com.netflix.conductor.core.utils.ParametersUtils;
24+
import com.netflix.conductor.model.TaskModel;
25+
import com.netflix.conductor.model.WorkflowModel;
26+
27+
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT_FOR_WEBHOOK;
28+
29+
/**
30+
* An implementation of {@link TaskMapper} to map a {@link
31+
* com.netflix.conductor.common.metadata.workflow.WorkflowTask} of type {@link
32+
* TaskType#WAIT_FOR_WEBHOOK} to a {@link TaskModel} in {@link TaskModel.Status#IN_PROGRESS} state.
33+
*
34+
* <p>The task immediately enters IN_PROGRESS and waits for a matching inbound webhook event to
35+
* complete it.
36+
*/
37+
@Component
38+
public class WaitForWebhookTaskMapper implements TaskMapper {
39+
40+
private static final Logger LOGGER = LoggerFactory.getLogger(WaitForWebhookTaskMapper.class);
41+
42+
private final ParametersUtils parametersUtils;
43+
44+
public WaitForWebhookTaskMapper(ParametersUtils parametersUtils) {
45+
this.parametersUtils = parametersUtils;
46+
}
47+
48+
@Override
49+
public String getTaskType() {
50+
return TaskType.WAIT_FOR_WEBHOOK.name();
51+
}
52+
53+
@Override
54+
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
55+
56+
LOGGER.debug("TaskMapperContext {} in WaitForWebhookTaskMapper", taskMapperContext);
57+
58+
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
59+
String taskId = taskMapperContext.getTaskId();
60+
61+
Map<String, Object> taskInput =
62+
parametersUtils.getTaskInputV2(
63+
taskMapperContext.getWorkflowTask().getInputParameters(),
64+
workflowModel,
65+
taskId,
66+
null);
67+
68+
TaskModel task = taskMapperContext.createTaskModel();
69+
task.setTaskType(TASK_TYPE_WAIT_FOR_WEBHOOK);
70+
task.setInputData(taskInput);
71+
task.setStartTime(System.currentTimeMillis());
72+
task.setStatus(TaskModel.Status.IN_PROGRESS);
73+
task.setCallbackAfterSeconds(Integer.MAX_VALUE);
74+
75+
return List.of(task);
76+
}
77+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2024 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.core.execution.mapper;
14+
15+
import java.util.HashMap;
16+
import java.util.List;
17+
import java.util.Map;
18+
19+
import org.junit.Test;
20+
21+
import com.netflix.conductor.common.metadata.tasks.TaskDef;
22+
import com.netflix.conductor.common.metadata.tasks.TaskType;
23+
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
24+
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
25+
import com.netflix.conductor.core.utils.IDGenerator;
26+
import com.netflix.conductor.core.utils.ParametersUtils;
27+
import com.netflix.conductor.model.TaskModel;
28+
import com.netflix.conductor.model.WorkflowModel;
29+
30+
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT_FOR_WEBHOOK;
31+
32+
import static org.junit.Assert.assertEquals;
33+
import static org.junit.Assert.assertTrue;
34+
import static org.mockito.ArgumentMatchers.any;
35+
import static org.mockito.Mockito.doReturn;
36+
import static org.mockito.Mockito.mock;
37+
38+
public class WaitForWebhookTaskMapperTest {
39+
40+
@Test
41+
public void getTaskType() {
42+
ParametersUtils parametersUtils = mock(ParametersUtils.class);
43+
WaitForWebhookTaskMapper mapper = new WaitForWebhookTaskMapper(parametersUtils);
44+
assertEquals(TaskType.WAIT_FOR_WEBHOOK.name(), mapper.getTaskType());
45+
}
46+
47+
@Test
48+
public void getMappedTasks_createsOneTask() {
49+
ParametersUtils parametersUtils = mock(ParametersUtils.class);
50+
WaitForWebhookTaskMapper mapper = new WaitForWebhookTaskMapper(parametersUtils);
51+
52+
TaskMapperContext context = buildContext(parametersUtils, new HashMap<>());
53+
List<TaskModel> tasks = mapper.getMappedTasks(context);
54+
55+
assertEquals(1, tasks.size());
56+
}
57+
58+
@Test
59+
public void getMappedTasks_taskIsInProgress() {
60+
ParametersUtils parametersUtils = mock(ParametersUtils.class);
61+
WaitForWebhookTaskMapper mapper = new WaitForWebhookTaskMapper(parametersUtils);
62+
63+
TaskMapperContext context = buildContext(parametersUtils, new HashMap<>());
64+
TaskModel task = mapper.getMappedTasks(context).get(0);
65+
66+
assertEquals(TaskModel.Status.IN_PROGRESS, task.getStatus());
67+
}
68+
69+
@Test
70+
public void getMappedTasks_taskTypeIsWaitForWebhook() {
71+
ParametersUtils parametersUtils = mock(ParametersUtils.class);
72+
WaitForWebhookTaskMapper mapper = new WaitForWebhookTaskMapper(parametersUtils);
73+
74+
TaskMapperContext context = buildContext(parametersUtils, new HashMap<>());
75+
TaskModel task = mapper.getMappedTasks(context).get(0);
76+
77+
assertEquals(TASK_TYPE_WAIT_FOR_WEBHOOK, task.getTaskType());
78+
}
79+
80+
@Test
81+
public void getMappedTasks_callbackSetToMaxValue() {
82+
ParametersUtils parametersUtils = mock(ParametersUtils.class);
83+
WaitForWebhookTaskMapper mapper = new WaitForWebhookTaskMapper(parametersUtils);
84+
85+
TaskMapperContext context = buildContext(parametersUtils, new HashMap<>());
86+
TaskModel task = mapper.getMappedTasks(context).get(0);
87+
88+
assertEquals(Integer.MAX_VALUE, task.getCallbackAfterSeconds());
89+
}
90+
91+
@Test
92+
public void getMappedTasks_resolvedInputStoredOnTask() {
93+
ParametersUtils parametersUtils = mock(ParametersUtils.class);
94+
Map<String, Object> resolvedInput =
95+
Map.of("matches", Map.of("$['event']['type']", "payment.completed"));
96+
doReturn(resolvedInput).when(parametersUtils).getTaskInputV2(any(), any(), any(), any());
97+
98+
WaitForWebhookTaskMapper mapper = new WaitForWebhookTaskMapper(parametersUtils);
99+
TaskMapperContext context = buildContext(parametersUtils, resolvedInput);
100+
TaskModel task = mapper.getMappedTasks(context).get(0);
101+
102+
assertTrue(task.getInputData().containsKey("matches"));
103+
}
104+
105+
private TaskMapperContext buildContext(
106+
ParametersUtils parametersUtils, Map<String, Object> inputParameters) {
107+
WorkflowTask workflowTask = new WorkflowTask();
108+
workflowTask.setName("wait_for_webhook_task");
109+
workflowTask.setType(TaskType.WAIT_FOR_WEBHOOK.name());
110+
workflowTask.setInputParameters(inputParameters);
111+
112+
WorkflowModel workflow = new WorkflowModel();
113+
WorkflowDef workflowDef = new WorkflowDef();
114+
workflow.setWorkflowDefinition(workflowDef);
115+
116+
String taskId = new IDGenerator().generate();
117+
118+
return TaskMapperContext.newBuilder()
119+
.withWorkflowModel(workflow)
120+
.withTaskDefinition(new TaskDef())
121+
.withWorkflowTask(workflowTask)
122+
.withTaskInput(inputParameters)
123+
.withRetryCount(0)
124+
.withTaskId(taskId)
125+
.build();
126+
}
127+
}

server/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ dependencies {
6666
//System Tasks
6767
implementation project(':conductor-http-task')
6868
implementation project(':conductor-json-jq-task')
69+
implementation project(':conductor-webhook-task')
6970
implementation project(':conductor-kafka')
7071

7172
//Metrics

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ include 'redis-concurrency-limit'
3636

3737
include 'json-jq-task'
3838
include 'http-task'
39+
include 'webhook-task'
3940

4041
include 'rest'
4142
include 'grpc'

webhook-task/README.md

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# conductor-webhook-task
2+
3+
Provides the `WAIT_FOR_WEBHOOK` system task for Conductor OSS.
4+
5+
## Overview
6+
7+
`WAIT_FOR_WEBHOOK` allows a workflow to pause at a task and wait for an inbound HTTP webhook event before continuing. When a matching event arrives at `POST /webhook/{id}`, the task is completed with the webhook payload as its output.
8+
9+
This module defines the task type, its lifecycle, and the extension interfaces that allow alternative storage backends to be plugged in.
10+
11+
## Task definition
12+
13+
```json
14+
{
15+
"name": "wait_for_payment",
16+
"taskReferenceName": "waitForPayment",
17+
"type": "WAIT_FOR_WEBHOOK",
18+
"inputParameters": {
19+
"matches": {
20+
"$['event']['type']": "payment.completed",
21+
"$['data']['orderId']": "${workflow.input.orderId}"
22+
}
23+
}
24+
}
25+
```
26+
27+
### `matches`
28+
29+
A map of JSONPath expressions to expected values. Both sides are resolved at task registration time (values may reference workflow input via `${workflow.input.xxx}`). An inbound webhook event is routed to this task only if all expressions evaluate to the expected values against the event payload.
30+
31+
### Output
32+
33+
When completed by a webhook event, the task output contains:
34+
35+
```json
36+
{
37+
"payload": { ... }
38+
}
39+
```
40+
41+
Where `payload` is the full parsed body of the inbound webhook request.
42+
43+
## Webhook configuration
44+
45+
Webhook configs are managed via `/api/metadata/webhook` (implemented in PR 2 / #889). A config links an inbound webhook endpoint to one or more workflows.
46+
47+
```json
48+
{
49+
"name": "payment-provider",
50+
"verifier": "HEADER_BASED",
51+
"headers": {
52+
"X-Webhook-Secret": "my-secret-value"
53+
},
54+
"receiverWorkflowNamesToVersions": {
55+
"order_payment_workflow": 1
56+
}
57+
}
58+
```
59+
60+
## Extension interfaces
61+
62+
### `WebhookTaskDAO`
63+
64+
Stores the `hash → taskId` mapping that routes inbound events to waiting tasks.
65+
66+
```java
67+
public interface WebhookTaskDAO {
68+
void put(String hash, String taskId);
69+
List<String> get(String hash);
70+
void remove(String hash, String taskId);
71+
}
72+
```
73+
74+
The default implementation (`InMemoryWebhookTaskDAO`) stores mappings in memory. **Mappings are lost on server restart.** For multi-node or production-grade deployments, register a durable implementation as a Spring bean — the `@ConditionalOnMissingBean` annotation on `InMemoryWebhookTaskDAO` ensures it will be skipped.
75+
76+
```java
77+
@Component
78+
public class MyWebhookTaskDAO implements WebhookTaskDAO {
79+
// backed by Postgres, Redis, etc.
80+
}
81+
```
82+
83+
### `WebhookVerifier`
84+
85+
Validates inbound webhook requests before processing them.
86+
87+
```java
88+
public interface WebhookVerifier {
89+
String getVerifierType(); // matches WebhookConfig.verifier field
90+
boolean verify(HttpServletRequest request, WebhookConfig config);
91+
default Optional<String> extractChallenge(HttpServletRequest request) { return Optional.empty(); }
92+
}
93+
```
94+
95+
The default implementation (`HeaderBasedVerifier`) checks that configured header key/value pairs are present in the request. Additional verifiers (e.g., HMAC-based) can be registered as Spring beans and selected per webhook config via the `verifier` field.
96+
97+
## Implementation status
98+
99+
| PR | Scope | Status |
100+
|----|-------|--------|
101+
| PR 1 | Task type, mapper, system task, DAO interface + in-memory impl | ✅ merged |
102+
| PR 2 (#889) | `WebhookConfig` model + CRUD API | pending |
103+
| PR 3 (#890) | Hash computation + `WebhookVerifier` / `HeaderBasedVerifier` | pending |
104+
| PR 4 (#892) | Inbound endpoint + task completion | pending |
105+
| PR 5 (#891) | `workflowsToStart` — trigger new workflows from webhooks | pending |
106+
| PR 6 (#893) | Reference documentation | pending |
107+
108+
See epic: https://github.com/conductor-oss/conductor/issues/888
109+
110+
## Example workflow
111+
112+
`src/test/resources/examples/order_payment_workflow.json` — an order processing workflow that pauses at a payment step and resumes when the payment provider POSTs a callback.

webhook-task/build.gradle

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright 2024 Conductor authors
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
dependencies {
14+
implementation project(':conductor-common')
15+
implementation project(':conductor-core')
16+
compileOnly 'org.springframework.boot:spring-boot-starter'
17+
compileOnly 'org.springframework.boot:spring-boot-starter-web'
18+
}

0 commit comments

Comments
 (0)