Skip to content

Commit db5e3b8

Browse files
committed
wip
1 parent 8b82f00 commit db5e3b8

7 files changed

Lines changed: 72 additions & 83 deletions

File tree

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,47 @@
11
# Demo tasks interaction
22

3-
This example demonstrate a generic implementation for "User Tasks" interaction in Temporal.
4-
5-
TODO
6-
7-
Temporal does not have such concept of "human task", as BPM systems, but it can be easily implemented with
8-
the pattern:
9-
- The main workflow have an activity (or local activity) that send the request to an external service.
10-
The external for this example is another workflow ([WorkflowTaskManagerImpl.java](WorkflowTaskManagerImpl.java)),
3+
This example demonstrates a generic implementation for "User Tasks" interaction with Temporal,
4+
which can be easily implemented as follows:
5+
- The main workflow [WorkflowWithTasks](./WorkflowWithTasks.java) have an activity (or local activity) that send the request to an external service.
6+
The _external service_, for this example, is another workflow ([WorkflowTaskManager](WorkflowTaskManager.java)),
117
that takes care of the task life-cicle.
12-
- The main workflow wait with `Workflow.await` to receive a Signal.The external service signal back the main
8+
- The main workflow waits, with `Workflow.await`, to receive a Signal.
9+
- The _external service_ signal back the main
1310
workflow to unblock it.
1411

15-
The three steps mentioned above are encapsulated in the class [TaskClient.java](./TaskClient.java)
12+
The two first steps mentioned above are encapsulated in the class [TaskService.java](./TaskService.java), to make it easily reusable.
1613

1714
## Run the sample
1815

19-
- Schedule the main workflow execution
16+
- Schedule the main workflow execution ([WorkflowWithTasks](./WorkflowWithTasks.java)), the one that contains the _User Tasks_
2017

2118
```bash
2219
./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.client.StartWorkflow
2320
```
2421

25-
- Open other terminal and Start the Worker
22+
- Open other terminal and start the Worker
2623

2724
```bash
2825
./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.worker.Worker
2926
```
3027

31-
The worker will start the workflow execution and schedule the two activities:
32-
28+
You will notice, from the worker logs, that it start the main workflow and execute two activities, the
29+
two activities register two tasks to the external service ([WorkflowTaskManagerImpl.java](WorkflowTaskManagerImpl.java))
3330

3431
```
35-
06:08:22.927 {WorkflowWithTasks0.25382038076376945 } [workflow[WorkflowWithTasks0.25382038076376945]-1] INFO i.t.s.taskinteraction.TaskService - Before creating task : Task{token='WorkflowWithTasks0.25382038076376945-1713845302806-1', title=TaskTitle{value='TODO 1'}}
36-
06:08:22.958 {WorkflowWithTasks0.25382038076376945 } [workflow[WorkflowWithTasks0.25382038076376945]-2] INFO i.t.s.taskinteraction.TaskService - Before creating task : Task{token='WorkflowWithTasks0.25382038076376945-1713845302806-2', title=TaskTitle{value='TODO 2'}}
37-
06:08:23.039 {WorkflowWithTasks0.25382038076376945 } [workflow[WorkflowWithTasks0.25382038076376945]-1] INFO i.t.s.taskinteraction.TaskService - Task created: Task{token='WorkflowWithTasks0.25382038076376945-1713845302806-1', title=TaskTitle{value='TODO 1'}}
38-
06:08:23.039 {WorkflowWithTasks0.25382038076376945 } [workflow[WorkflowWithTasks0.25382038076376945]-2] INFO i.t.s.taskinteraction.TaskService - Task created: Task{token='WorkflowWithTasks0.25382038076376945-1713845302806-2', title=TaskTitle{value='TODO 2'}}
39-
32+
07:19:39.528 {WorkflowWithTasks1714454371179 } [workflow[WorkflowWithTasks1714454371179]-1] INFO i.t.s.taskinteraction.TaskService - Before creating task : Task{token='WorkflowWithTasks1714454371179_1', title=TaskTitle{value='TODO 1'}}
33+
07:19:39.563 {WorkflowWithTasks1714454371179 } [workflow[WorkflowWithTasks1714454371179]-2] INFO i.t.s.taskinteraction.TaskService - Before creating task : Task{token='WorkflowWithTasks1714454371179_2', title=TaskTitle{value='TODO 2'}}
34+
07:19:39.683 {WorkflowWithTasks1714454371179 } [workflow[WorkflowWithTasks1714454371179]-1] INFO i.t.s.taskinteraction.TaskService - Task created: Task{token='WorkflowWithTasks1714454371179_1', title=TaskTitle{value='TODO 1'}}
35+
07:19:39.684 {WorkflowWithTasks1714454371179 } [workflow[WorkflowWithTasks1714454371179]-2] INFO i.t.s.taskinteraction.TaskService - Task created: Task{token='WorkflowWithTasks1714454371179_2', title=TaskTit
4036
```
4137

42-
- Complete task in the "External system". This class will query and complete one of the
43-
pending task in the external system, in this case a workflow, that will at the same time,
44-
signal back the main workflow (the one that created the task and is waiting)
38+
- Now, we can start completing the tasks using the helper class [CompleteTask.java](./client/CompleteTask.java)
4539

4640
```bash
4741
./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.client.CompleteTask
4842
```
43+
You can see from the implementation that [WorkflowWithTasksImpl](./WorkflowWithTasksImpl.java) has three task.
44+
- two in parallel using `Async.procedure`,
45+
- one blocking task at the end.
4946

47+
This class needs to be run three times. After the three task are completed the main workflow completes.

core/src/main/java/io/temporal/samples/taskinteraction/TaskService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
import java.util.Map;
2929
import org.slf4j.Logger;
3030

31+
/**
32+
* This class responsibility is to register the task in the external system and waits for the
33+
* external system to signal back.
34+
*/
3135
public class TaskService<R> {
3236

3337
private final ActivityTask activity =

core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManagerImpl.java

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@
2020
package io.temporal.samples.taskinteraction;
2121

2222
import io.temporal.workflow.Workflow;
23-
import java.util.ArrayList;
24-
import java.util.List;
25-
import java.util.Optional;
26-
import java.util.StringTokenizer;
27-
import org.jetbrains.annotations.NotNull;
23+
import java.util.*;
2824

2925
public class WorkflowTaskManagerImpl implements WorkflowTaskManager {
3026

@@ -39,27 +35,21 @@ public void execute(List<Task> inputPendingTask, List<String> inputTaskToComplet
3935

4036
while (true) {
4137

42-
final List<Task> currentTask = new ArrayList<>(pendingTask);
43-
4438
Workflow.await(
4539
() ->
46-
// Wait until one task is added / removed
47-
currentTask.size() != pendingTask.size()
48-
// or there are pending task to complete
49-
|| !tasksToComplete.isEmpty());
50-
51-
if (!tasksToComplete.isEmpty()) {
40+
// Wait until there are pending task to complete
41+
!tasksToComplete.isEmpty());
5242

53-
final String taskToken = tasksToComplete.remove(0);
54-
final String externalWorkflowId = new StringTokenizer(taskToken, "_").nextToken();
43+
final String taskToken = tasksToComplete.remove(0);
5544

56-
Workflow.newExternalWorkflowStub(TaskClient.class, externalWorkflowId)
57-
.completeTaskByToken(taskToken);
45+
// Find the workflow id of the workflow we have to signal back
46+
final String externalWorkflowId = new StringTokenizer(taskToken, "_").nextToken();
5847

59-
final Task task = getPendingTaskWithToken(taskToken).get();
48+
Workflow.newExternalWorkflowStub(TaskClient.class, externalWorkflowId)
49+
.completeTaskByToken(taskToken);
6050

61-
pendingTask.remove(task);
62-
}
51+
final Task task = getPendingTaskWithToken(taskToken).get();
52+
pendingTask.remove(task);
6353

6454
if (Workflow.getInfo().isContinueAsNewSuggested()) {
6555
Workflow.newContinueAsNewStub(WorkflowTaskManager.class)
@@ -68,26 +58,6 @@ public void execute(List<Task> inputPendingTask, List<String> inputTaskToComplet
6858
}
6959
}
7060

71-
@NotNull
72-
private Optional<Task> getPendingTaskWithToken(final String taskToken) {
73-
return pendingTask.stream().filter((t) -> t.getToken().equals(taskToken)).findFirst();
74-
}
75-
76-
private void initTaskToComplete(final List<String> tasks) {
77-
if (tasksToComplete == null) {
78-
tasksToComplete = new ArrayList<>();
79-
}
80-
tasksToComplete.addAll(tasks);
81-
}
82-
83-
private void initPendingTasks(final List<Task> tasks) {
84-
85-
if (pendingTask == null) {
86-
pendingTask = new ArrayList<>();
87-
}
88-
pendingTask.addAll(tasks);
89-
}
90-
9161
@Override
9262
public void createTask(Task task) {
9363
initPendingTasks(new ArrayList<>());
@@ -102,7 +72,7 @@ public void completeTaskByToken(String taskToken) {
10272
Workflow.await(
10373
() -> {
10474
final boolean taskCompleted =
105-
getPendingTask().stream().filter((t) -> t.getToken() == taskToken).count() == 0;
75+
getPendingTask().stream().noneMatch((t) -> Objects.equals(t.getToken(), taskToken));
10676

10777
return taskCompleted;
10878
});
@@ -112,4 +82,23 @@ public void completeTaskByToken(String taskToken) {
11282
public List<Task> getPendingTask() {
11383
return pendingTask;
11484
}
85+
86+
private Optional<Task> getPendingTaskWithToken(final String taskToken) {
87+
return pendingTask.stream().filter((t) -> t.getToken().equals(taskToken)).findFirst();
88+
}
89+
90+
private void initTaskToComplete(final List<String> tasks) {
91+
if (tasksToComplete == null) {
92+
tasksToComplete = new ArrayList<>();
93+
}
94+
tasksToComplete.addAll(tasks);
95+
}
96+
97+
private void initPendingTasks(final List<Task> tasks) {
98+
99+
if (pendingTask == null) {
100+
pendingTask = new ArrayList<>();
101+
}
102+
pendingTask.addAll(tasks);
103+
}
115104
}

core/src/main/java/io/temporal/samples/taskinteraction/WorkflowWithTasksImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Arrays;
2626
import org.slf4j.Logger;
2727

28+
/** Workflow that creates three task and waits for them to complete */
2829
public class WorkflowWithTasksImpl implements WorkflowWithTasks {
2930

3031
private final Logger logger = Workflow.getLogger(WorkflowWithTasksImpl.class);
@@ -35,12 +36,10 @@ public class WorkflowWithTasksImpl implements WorkflowWithTasks {
3536
public void execute() {
3637

3738
// Schedule two "tasks" in parallel. The last parameter is the token the client needs
38-
// to change the task state, and to complete the task eventually
39-
39+
// to unblock/complete the task. This token contains the workflowId that the
40+
// client can use to create the workflow stub.
4041
final TaskToken taskToken = new TaskToken();
4142

42-
// Schedule two "tasks" in parallel. The last parameter is the token the client needs
43-
// to change the task state, and ultimately to complete the task
4443
logger.info("About to create async tasks");
4544
final Promise<Void> task1 =
4645
Async.procedure(

core/src/main/java/io/temporal/samples/taskinteraction/activity/ActivityTaskImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public void createTask(Task task) {
5555
// This will be handled differently once updateWithStart is implemented
5656
}
5757

58+
// register the "task" to the external workflow that manages task lifecycle
5859
taskManager.createTask(task);
5960
}
6061
}

core/src/main/java/io/temporal/samples/taskinteraction/client/CompleteTask.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,36 +25,33 @@
2525
import io.temporal.serviceclient.WorkflowServiceStubs;
2626
import java.util.List;
2727

28+
/**
29+
* This class helps to complete tasks in the external workflow. Queries for pending task and
30+
* complete one of them
31+
*/
2832
public class CompleteTask {
2933

3034
public static void main(String[] args) throws InterruptedException {
3135

3236
final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
3337
final WorkflowClient client = WorkflowClient.newInstance(service);
3438

35-
while (true) {
36-
37-
// WorkflowTaskManager keeps and manage workflow task lifecycle
38-
final WorkflowTaskManager workflowTaskManager =
39-
client.newWorkflowStub(WorkflowTaskManager.class, WorkflowTaskManager.WORKFLOW_ID);
39+
// WorkflowTaskManager keeps and manage workflow task lifecycle
40+
final WorkflowTaskManager workflowTaskManager =
41+
client.newWorkflowStub(WorkflowTaskManager.class, WorkflowTaskManager.WORKFLOW_ID);
4042

41-
Thread.sleep(200);
42-
final List<Task> pendingTask = getPendingTask(workflowTaskManager);
43-
System.out.println("Pending task " + pendingTask);
43+
Thread.sleep(200);
44+
final List<Task> pendingTask = getPendingTask(workflowTaskManager);
45+
System.out.println("Pending task " + pendingTask);
4446

45-
if (pendingTask.isEmpty()) {
46-
// Thread.sleep(1000);
47-
continue;
48-
}
47+
if (!pendingTask.isEmpty()) {
4948

5049
final Task nextOpenTask = pendingTask.get(0);
5150
System.out.println("Completing task with token " + nextOpenTask);
5251
workflowTaskManager.completeTaskByToken(nextOpenTask.getToken());
53-
54-
System.out.println("Pending task " + getPendingTask(workflowTaskManager));
5552
}
5653

57-
// System.exit(0);
54+
System.out.println("Pending task " + getPendingTask(workflowTaskManager));
5855
}
5956

6057
private static List<Task> getPendingTask(final WorkflowTaskManager workflowTaskManager) {

core/src/main/java/io/temporal/samples/taskinteraction/client/StartWorkflow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.temporal.samples.taskinteraction.WorkflowWithTasks;
2727
import io.temporal.serviceclient.WorkflowServiceStubs;
2828

29+
/** Client that start schedule WorkflowWithTasks. */
2930
public class StartWorkflow {
3031

3132
public static void main(String[] args) throws InterruptedException {
@@ -43,7 +44,7 @@ public static void main(String[] args) throws InterruptedException {
4344

4445
System.out.println("Starting workflow: " + WorkflowWithTasks.WORKFLOW_ID);
4546

46-
// Execute workflow waiting for it to complete.
47+
// Schedule workflow and waiting for it to complete.
4748
workflow.execute();
4849

4950
System.out.println("Workflow completed: " + WorkflowWithTasks.WORKFLOW_ID);

0 commit comments

Comments
 (0)