Skip to content

Commit e5f654b

Browse files
committed
wip
1 parent 6615ac6 commit e5f654b

8 files changed

Lines changed: 162 additions & 175 deletions

File tree

core/src/main/java/io/temporal/samples/taskinteraction/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
This example demonstrate a generic implementation for "User Tasks" interaction in Temporal.
44

5+
TODO
6+
57
Temporal does not have such concept of "human task", as BPM systems, but it can be easily implemented with
68
the pattern:
79
- The main workflow have an activity (or local activity) that send the request to an external service.

core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public interface WorkflowTaskManager {
3030
@WorkflowMethod
3131
void execute(List<Task> inputPendingTask, List<String> inputTaskToComplete);
3232

33-
@SignalMethod
33+
@UpdateMethod
3434
void createTask(Task task);
3535

3636
@UpdateMethod

core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManagerImpl.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class WorkflowTaskManagerImpl implements WorkflowTaskManager {
3030

3131
private List<Task> pendingTask;
3232

33-
private List<String> taskToComplete;
33+
private List<String> tasksToComplete;
3434

3535
@Override
3636
public void execute(List<Task> inputPendingTask, List<String> inputTaskToComplete) {
@@ -46,23 +46,27 @@ public void execute(List<Task> inputPendingTask, List<String> inputTaskToComplet
4646
// Wait until one task is added / removed
4747
currentTask.size() != pendingTask.size()
4848
// or there are pending task to complete
49-
|| !taskToComplete.isEmpty());
49+
|| !tasksToComplete.isEmpty());
5050

51-
if (!taskToComplete.isEmpty()) {
51+
if (!tasksToComplete.isEmpty()) {
5252

53-
final String taskToken = taskToComplete.remove(0);
54-
final String externalWorkflowId = new StringTokenizer(taskToken, "-").nextToken();
53+
System.out.println("tasksToComplete >>>>> " + tasksToComplete);
54+
55+
final String taskToken = tasksToComplete.remove(0);
56+
final String externalWorkflowId = new StringTokenizer(taskToken, "_").nextToken();
5557

5658
Workflow.newExternalWorkflowStub(TaskClient.class, externalWorkflowId)
5759
.completeTaskByToken(taskToken);
5860

61+
System.out.println("getPendingTaskWithToken >>>>> " + taskToken);
5962
final Task task = getPendingTaskWithToken(taskToken).get();
63+
6064
pendingTask.remove(task);
6165
}
6266

6367
if (Workflow.getInfo().isContinueAsNewSuggested()) {
6468
Workflow.newContinueAsNewStub(WorkflowTaskManager.class)
65-
.execute(pendingTask, taskToComplete);
69+
.execute(pendingTask, tasksToComplete);
6670
}
6771
}
6872
}
@@ -73,10 +77,10 @@ private Optional<Task> getPendingTaskWithToken(final String taskToken) {
7377
}
7478

7579
private void initTaskToComplete(final List<String> tasks) {
76-
if (taskToComplete == null) {
77-
taskToComplete = new ArrayList<>();
80+
if (tasksToComplete == null) {
81+
tasksToComplete = new ArrayList<>();
7882
}
79-
taskToComplete.addAll(tasks);
83+
tasksToComplete.addAll(tasks);
8084
}
8185

8286
private void initPendingTasks(final List<Task> tasks) {
@@ -89,14 +93,19 @@ private void initPendingTasks(final List<Task> tasks) {
8993

9094
@Override
9195
public void createTask(Task task) {
96+
System.out.println("creating task " + task);
97+
9298
initPendingTasks(new ArrayList<>());
9399
pendingTask.add(task);
94100
}
95101

96102
@Override
97103
public void completeTaskByToken(String taskToken) {
98104

99-
taskToComplete.add(taskToken);
105+
System.out.println("completeTaskByToken adding taskToken >>>>> " + taskToken);
106+
System.out.println("completeTaskByToken >>>>> " + tasksToComplete);
107+
108+
tasksToComplete.add(taskToken);
100109

101110
Workflow.await(
102111
() -> {

core/src/main/java/io/temporal/samples/taskinteraction/WorkflowWithTasksImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private static class TaskToken {
7171

7272
public String getNext() {
7373

74-
return Workflow.getInfo().getWorkflowId() + "-" + taskToken++;
74+
return Workflow.getInfo().getWorkflowId() + "_" + taskToken++;
7575
}
7676
}
7777
}

core/src/main/java/io/temporal/samples/taskinteraction/activity/ActivityTaskImpl.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,17 @@
1919

2020
package io.temporal.samples.taskinteraction.activity;
2121

22-
import static io.temporal.samples.taskinteraction.worker.Worker.TASK_QUEUE;
23-
22+
import io.temporal.activity.Activity;
2423
import io.temporal.client.WorkflowClient;
24+
import io.temporal.client.WorkflowExecutionAlreadyStarted;
2525
import io.temporal.client.WorkflowOptions;
26-
import io.temporal.client.WorkflowStub;
2726
import io.temporal.samples.taskinteraction.Task;
2827
import io.temporal.samples.taskinteraction.WorkflowTaskManager;
2928
import java.util.ArrayList;
3029

3130
public class ActivityTaskImpl implements ActivityTask {
3231

33-
private WorkflowClient workflowClient;
32+
private final WorkflowClient workflowClient;
3433

3534
public ActivityTaskImpl(WorkflowClient workflowClient) {
3635
this.workflowClient = workflowClient;
@@ -39,15 +38,23 @@ public ActivityTaskImpl(WorkflowClient workflowClient) {
3938
@Override
4039
public void createTask(Task task) {
4140

42-
WorkflowStub taskManager =
43-
workflowClient.newUntypedWorkflowStub(
44-
WorkflowTaskManager.class.getSimpleName(),
45-
WorkflowOptions.newBuilder()
46-
.setWorkflowId(WorkflowTaskManager.WORKFLOW_ID)
47-
.setTaskQueue(TASK_QUEUE)
48-
.build());
41+
final String taskQueue = Activity.getExecutionContext().getInfo().getActivityTaskQueue();
42+
43+
final WorkflowOptions workflowOptions =
44+
WorkflowOptions.newBuilder()
45+
.setWorkflowId(WorkflowTaskManager.WORKFLOW_ID)
46+
.setTaskQueue(taskQueue)
47+
.build();
48+
49+
final WorkflowTaskManager taskManager =
50+
workflowClient.newWorkflowStub(WorkflowTaskManager.class, workflowOptions);
51+
try {
52+
WorkflowClient.start(taskManager::execute, new ArrayList<>(), new ArrayList<>());
53+
} catch (WorkflowExecutionAlreadyStarted e) {
54+
// expected exception if workflow was started by a previous activity execution.
55+
// This will be handled differently once updateWithStart is implemented
56+
}
4957

50-
taskManager.signalWithStart(
51-
"createTask", new Object[] {task}, new Object[] {new ArrayList<>(), new ArrayList<>()});
58+
taskManager.createTask(task);
5259
}
5360
}

core/src/main/java/io/temporal/samples/taskinteraction/worker/Worker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
public class Worker {
3131

32-
public static final String TASK_QUEUE = "TaskWorkflowImplTaskQueue";
32+
public static final String TASK_QUEUE = "TaskInteractionQueue";
3333

3434
public static void main(String[] args) {
3535

core/src/test/java/io/temporal/samples/taskinteraction/TaskWorkflowImplTest.java_

Lines changed: 0 additions & 149 deletions
This file was deleted.

0 commit comments

Comments
 (0)