Skip to content

Commit 8b82f00

Browse files
committed
wip
1 parent e5f654b commit 8b82f00

3 files changed

Lines changed: 80 additions & 29 deletions

File tree

core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManagerImpl.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,12 @@ public void execute(List<Task> inputPendingTask, List<String> inputTaskToComplet
5050

5151
if (!tasksToComplete.isEmpty()) {
5252

53-
System.out.println("tasksToComplete >>>>> " + tasksToComplete);
54-
5553
final String taskToken = tasksToComplete.remove(0);
5654
final String externalWorkflowId = new StringTokenizer(taskToken, "_").nextToken();
5755

5856
Workflow.newExternalWorkflowStub(TaskClient.class, externalWorkflowId)
5957
.completeTaskByToken(taskToken);
6058

61-
System.out.println("getPendingTaskWithToken >>>>> " + taskToken);
6259
final Task task = getPendingTaskWithToken(taskToken).get();
6360

6461
pendingTask.remove(task);
@@ -93,18 +90,13 @@ private void initPendingTasks(final List<Task> tasks) {
9390

9491
@Override
9592
public void createTask(Task task) {
96-
System.out.println("creating task " + task);
97-
9893
initPendingTasks(new ArrayList<>());
9994
pendingTask.add(task);
10095
}
10196

10297
@Override
10398
public void completeTaskByToken(String taskToken) {
10499

105-
System.out.println("completeTaskByToken adding taskToken >>>>> " + taskToken);
106-
System.out.println("completeTaskByToken >>>>> " + tasksToComplete);
107-
108100
tasksToComplete.add(taskToken);
109101

110102
Workflow.await(

core/src/main/java/io/temporal/samples/taskinteraction/client/StartWorkflow.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,6 @@ public static void main(String[] args) throws InterruptedException {
3333
final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
3434
final WorkflowClient client = WorkflowClient.newInstance(service);
3535

36-
extracted(client);
37-
while (true) {
38-
39-
extracted(client);
40-
41-
Thread.sleep(200);
42-
}
43-
}
44-
45-
private static void extracted(final WorkflowClient client) {
4636
final WorkflowWithTasks workflow =
4737
client.newWorkflowStub(
4838
WorkflowWithTasks.class,
@@ -51,9 +41,13 @@ private static void extracted(final WorkflowClient client) {
5141
.setTaskQueue(TASK_QUEUE)
5242
.build());
5343

54-
System.out.println("Starting workflow " + WorkflowWithTasks.WORKFLOW_ID);
44+
System.out.println("Starting workflow: " + WorkflowWithTasks.WORKFLOW_ID);
5545

5646
// Execute workflow waiting for it to complete.
57-
WorkflowClient.execute(workflow::execute);
47+
workflow.execute();
48+
49+
System.out.println("Workflow completed: " + WorkflowWithTasks.WORKFLOW_ID);
50+
51+
System.exit(0);
5852
}
5953
}

core/src/test/java/io/temporal/samples/taskinteraction/WorkflowWithTasksImplTest.java

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,27 @@
2727
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
2828
import io.temporal.client.WorkflowClient;
2929
import io.temporal.client.WorkflowOptions;
30+
import io.temporal.common.interceptors.*;
3031
import io.temporal.samples.taskinteraction.activity.ActivityTaskImpl;
3132
import io.temporal.testing.TestWorkflowRule;
32-
import java.time.Duration;
33+
import io.temporal.worker.WorkerFactoryOptions;
34+
import io.temporal.workflow.Workflow;
3335
import java.util.List;
36+
import java.util.concurrent.CompletableFuture;
3437
import org.junit.Rule;
3538
import org.junit.Test;
3639

3740
public class WorkflowWithTasksImplTest {
3841

42+
private MyWorkerInterceptor myWorkerInterceptor = new MyWorkerInterceptor();
43+
3944
@Rule
4045
public TestWorkflowRule testWorkflowRule =
4146
TestWorkflowRule.newBuilder()
42-
// .setNamespace("default")
43-
// .setUseExternalService(true)
47+
.setWorkerFactoryOptions(
48+
WorkerFactoryOptions.newBuilder()
49+
.setWorkerInterceptors(myWorkerInterceptor)
50+
.validateAndBuildWithDefaults())
4451
.setDoNotStart(true)
4552
.build();
4653

@@ -56,6 +63,7 @@ public void testEnd2End() {
5663
testWorkflowRule
5764
.getWorker()
5865
.registerActivitiesImplementations(new ActivityTaskImpl(workflowClient));
66+
5967
testWorkflowRule.getTestEnvironment().start();
6068

6169
WorkflowWithTasks workflow =
@@ -70,23 +78,25 @@ public void testEnd2End() {
7078

7179
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
7280

73-
// TODO
74-
testWorkflowRule.getTestEnvironment().sleep(Duration.ofSeconds(2));
81+
// Wait until the first two tasks from WorkflowWithTasks are created in WorkflowTaskManager
82+
myWorkerInterceptor.waitUntilTwoCreateTaskInvocations();
7583

7684
WorkflowTaskManager workflowManager =
7785
workflowClient.newWorkflowStub(WorkflowTaskManager.class, WorkflowTaskManager.WORKFLOW_ID);
7886

7987
final List<Task> pendingTask = getPendingTask(workflowManager);
8088
assertEquals(2, pendingTask.size());
8189

82-
// Let's complete the two pending task. Send update to the workflow that holds and keep tasks
83-
// state
90+
// Complete the two pending task created in parallel from `WorkflowWithTasks`.
91+
// Send update to the workflow that keeps tasks state, that will signal back
92+
// the `WorkflowWithTasks` execution
8493
workflowManager.completeTaskByToken(pendingTask.get(0).getToken());
8594
workflowManager.completeTaskByToken(pendingTask.get(1).getToken());
8695

87-
// TODO
88-
testWorkflowRule.getTestEnvironment().sleep(Duration.ofSeconds(2));
96+
// Wait until the last task in WorkflowWithTasks is created in WorkflowTaskManager
97+
myWorkerInterceptor.waitUntilThreeInvocationsOfCreateTask();
8998

99+
// Complete the last task in `WorkflowWithTasks`
90100
assertEquals(1, getPendingTask(workflowManager).size());
91101
workflowManager.completeTaskByToken(getPendingTask(workflowManager).get(0).getToken());
92102

@@ -115,4 +125,59 @@ private DescribeWorkflowExecutionResponse getDescribeWorkflowExecutionResponse(
115125
private static List<Task> getPendingTask(final WorkflowTaskManager workflowManager) {
116126
return workflowManager.getPendingTask();
117127
}
128+
129+
private class MyWorkerInterceptor extends WorkerInterceptorBase {
130+
131+
private int createTaskInvocations = 0;
132+
133+
private CompletableFuture<Void> waitUntilTwoInvocationsOfCreateTask;
134+
135+
private CompletableFuture<Void> waitUntilThreeInvocationsOfCreateTask;
136+
137+
public MyWorkerInterceptor() {
138+
waitUntilTwoInvocationsOfCreateTask = new CompletableFuture<>();
139+
waitUntilThreeInvocationsOfCreateTask = new CompletableFuture<>();
140+
}
141+
142+
public Void waitUntilTwoCreateTaskInvocations() {
143+
return getFromCompletableFuture(waitUntilTwoInvocationsOfCreateTask);
144+
}
145+
146+
public Void waitUntilThreeInvocationsOfCreateTask() {
147+
return getFromCompletableFuture(waitUntilThreeInvocationsOfCreateTask);
148+
}
149+
150+
private Void getFromCompletableFuture(final CompletableFuture<Void> completableFuture) {
151+
try {
152+
return completableFuture.get();
153+
} catch (Exception e) {
154+
throw new RuntimeException(e);
155+
}
156+
}
157+
158+
@Override
159+
public WorkflowInboundCallsInterceptor interceptWorkflow(
160+
final WorkflowInboundCallsInterceptor next) {
161+
return new WorkflowInboundCallsInterceptorBase(next) {
162+
@Override
163+
public UpdateOutput executeUpdate(final UpdateInput input) {
164+
if (input.getUpdateName().equals("createTask")
165+
&& Workflow.getInfo()
166+
.getWorkflowType()
167+
.equals(WorkflowTaskManager.class.getSimpleName())) {
168+
createTaskInvocations++;
169+
if (createTaskInvocations == 2) {
170+
waitUntilTwoInvocationsOfCreateTask.complete(null);
171+
}
172+
173+
if (createTaskInvocations == 3) {
174+
waitUntilThreeInvocationsOfCreateTask.complete(null);
175+
}
176+
}
177+
178+
return super.executeUpdate(input);
179+
}
180+
};
181+
}
182+
}
118183
}

0 commit comments

Comments
 (0)