Skip to content

Commit e9d9d58

Browse files
Update test and test server impl.
1 parent 582b79d commit e9d9d58

6 files changed

Lines changed: 144 additions & 27 deletions

File tree

temporal-sdk/src/test/java/io/temporal/workflow/PriorityTest.java renamed to temporal-sdk/src/test/java/io/temporal/workflow/PriorityInfoTest.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,15 @@
3535
import org.junit.Rule;
3636
import org.junit.Test;
3737

38-
public class PriorityTest {
38+
public class PriorityInfoTest {
3939

4040
@Rule
4141
public SDKTestWorkflowRule testWorkflowRule =
4242
SDKTestWorkflowRule.newBuilder()
4343
.setWorkflowTypes(TestPriority.class, TestPriorityChildWorkflow.class)
4444
.setActivityImplementations(new PriorityActivitiesImpl())
45-
.setUseExternalService(true)
4645
.build();
4746

48-
// @Before
49-
// public void checkRealServer() {
50-
// assumeTrue("Test Server doesn't support priority", SDKTestWorkflowRule.useExternalService);
51-
// }
52-
5347
@Test
5448
public void testPriority() {
5549
TestWorkflow1 workflowStub =
@@ -89,6 +83,7 @@ public static class TestPriority implements TestWorkflow1 {
8983

9084
@Override
9185
public String execute(String taskQueue) {
86+
// Test that the priority is passed to activities
9287
String priority =
9388
Workflow.newActivityStub(
9489
PriorityActivities.class,
@@ -98,8 +93,8 @@ public String execute(String taskQueue) {
9893
.setPriority(Priority.newBuilder().setPriorityKey(3).build())
9994
.build())
10095
.activity1("1");
101-
Assert.assertEquals(priority, "3");
102-
96+
Assert.assertEquals("3", priority);
97+
// Test that of if no priority is set the workflows priority is used
10398
priority =
10499
Workflow.newActivityStub(
105100
PriorityActivities.class,
@@ -108,24 +103,44 @@ public String execute(String taskQueue) {
108103
.setStartToCloseTimeout(Duration.ofSeconds(10))
109104
.build())
110105
.activity1("2");
111-
Assert.assertEquals(priority, "5");
112-
106+
Assert.assertEquals("5", priority);
107+
// Test that of if a default priority is set the workflows priority is used
108+
priority =
109+
Workflow.newActivityStub(
110+
PriorityActivities.class,
111+
ActivityOptions.newBuilder()
112+
.setTaskQueue(taskQueue)
113+
.setStartToCloseTimeout(Duration.ofSeconds(10))
114+
.setPriority(Priority.newBuilder().build())
115+
.build())
116+
.activity1("2");
117+
Assert.assertEquals("5", priority);
118+
// Test that the priority is passed to child workflows
113119
priority =
114120
Workflow.newChildWorkflowStub(
115121
TestWorkflows.TestWorkflowReturnString.class,
116122
ChildWorkflowOptions.newBuilder()
117123
.setPriority(Priority.newBuilder().setPriorityKey(1).build())
118124
.build())
119125
.execute();
120-
Assert.assertEquals(priority, "1");
121-
126+
Assert.assertEquals("1", priority);
127+
// Test that of no priority is set the workflows priority is used
122128
priority =
123129
Workflow.newChildWorkflowStub(
124130
TestWorkflows.TestWorkflowReturnString.class,
125131
ChildWorkflowOptions.newBuilder().build())
126132
.execute();
127-
Assert.assertEquals(priority, "5");
128-
133+
Assert.assertEquals("5", priority);
134+
// Test that if a default priority is set the workflows priority is used
135+
priority =
136+
Workflow.newChildWorkflowStub(
137+
TestWorkflows.TestWorkflowReturnString.class,
138+
ChildWorkflowOptions.newBuilder()
139+
.setPriority(Priority.newBuilder().build())
140+
.build())
141+
.execute();
142+
Assert.assertEquals("5", priority);
143+
// Return the workflows priority
129144
return String.valueOf(Workflow.getInfo().getPriority().getPriorityKey());
130145
}
131146
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,10 @@ private static void initiateChildWorkflow(
11671167
if (d.hasRetryPolicy()) {
11681168
a.setRetryPolicy(d.getRetryPolicy());
11691169
}
1170+
if (d.hasPriority()) {
1171+
a.setPriority(d.getPriority());
1172+
}
1173+
11701174
HistoryEvent event =
11711175
HistoryEvent.newBuilder()
11721176
.setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
@@ -1207,6 +1211,17 @@ private static void initiateChildWorkflow(
12071211
if (d.hasInput()) {
12081212
startChild.setInput(d.getInput());
12091213
}
1214+
// If the child workflow has a priority, use it. Otherwise, use the priority of the parent
1215+
// workflow.
1216+
Priority p =
1217+
mergePriorities(
1218+
ctx.getWorkflowMutableState().getStartRequest().hasPriority()
1219+
? ctx.getWorkflowMutableState().getStartRequest().getPriority()
1220+
: null,
1221+
d.hasPriority() ? d.getPriority() : null);
1222+
if (p != null) {
1223+
startChild.setPriority(p);
1224+
}
12101225
addStartChildTask(ctx, data, initiatedEventId, startChild.build());
12111226
});
12121227
}
@@ -1293,6 +1308,9 @@ private static void startWorkflow(
12931308
if (request.hasRetryPolicy()) {
12941309
a.setRetryPolicy(request.getRetryPolicy());
12951310
}
1311+
if (request.hasPriority()) {
1312+
a.setPriority(request.getPriority());
1313+
}
12961314
data.retryState.ifPresent(
12971315
testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
12981316
a.setFirstExecutionRunId(data.firstExecutionRunId);
@@ -1515,7 +1533,9 @@ private static void scheduleActivityTask(
15151533
.setTaskQueue(d.getTaskQueue())
15161534
.setHeader(d.getHeader())
15171535
.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1518-
1536+
if (d.hasPriority()) {
1537+
a.setPriority(d.getPriority());
1538+
}
15191539
// Cannot set it in onCommit as it is used in the processScheduleActivityTask
15201540
data.scheduledEvent = a.build();
15211541
HistoryEvent.Builder event =
@@ -1543,6 +1563,17 @@ private static void scheduleActivityTask(
15431563
.setHeader(d.getHeader())
15441564
.setAttempt(1);
15451565

1566+
// If the activity has a priority, use it. Otherwise, use the priority of the workflow.
1567+
Priority p =
1568+
mergePriorities(
1569+
ctx.getWorkflowMutableState().getStartRequest().hasPriority()
1570+
? ctx.getWorkflowMutableState().getStartRequest().getPriority()
1571+
: null,
1572+
d.hasPriority() ? d.getPriority() : null);
1573+
if (p != null) {
1574+
taskResponse.setPriority(p);
1575+
}
1576+
15461577
TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
15471578
ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
15481579
ctx.addActivityTask(activityTask);
@@ -2530,4 +2561,19 @@ static RetryPolicy defaultNexusRetryPolicy() {
25302561
.setBackoffCoefficient(2.0)
25312562
.build();
25322563
}
2564+
2565+
static Priority mergePriorities(Priority parent, Priority child) {
2566+
if (child == null) {
2567+
return parent;
2568+
}
2569+
if (parent == null) {
2570+
return child;
2571+
}
2572+
Priority.Builder result = Priority.newBuilder();
2573+
result.setPriorityKey(parent.getPriorityKey());
2574+
if (child.getPriorityKey() != 0) {
2575+
result.setPriorityKey(child.getPriorityKey());
2576+
}
2577+
return result.build();
2578+
}
25332579
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TaskQueue.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
package io.temporal.internal.testservice;
2222

2323
import com.google.common.base.Preconditions;
24+
import io.temporal.api.common.v1.Priority;
25+
import java.util.Comparator;
2426
import java.util.LinkedList;
27+
import java.util.PriorityQueue;
2528
import java.util.concurrent.CancellationException;
2629
import java.util.concurrent.ExecutionException;
2730
import java.util.concurrent.Future;
@@ -37,11 +40,39 @@
3740
* @param <E>
3841
*/
3942
class TaskQueue<E> {
40-
private final LinkedList<E> backlog = new LinkedList<>();
43+
private static class TaskQueueElement<E> {
44+
private final E value;
45+
private final Priority priority;
46+
47+
TaskQueueElement(E value, Priority priority) {
48+
this.value = value;
49+
// TODO(Quinn): make this configurable
50+
this.priority =
51+
priority == Priority.getDefaultInstance()
52+
? Priority.newBuilder().setPriorityKey(3).build()
53+
: priority;
54+
}
55+
56+
TaskQueueElement(E value) {
57+
this.value = value;
58+
this.priority = Priority.newBuilder().setPriorityKey(3).build();
59+
}
60+
61+
public E getValue() {
62+
return value;
63+
}
64+
65+
public Priority getPriority() {
66+
return priority;
67+
}
68+
}
69+
70+
private final PriorityQueue<TaskQueueElement<E>> backlog =
71+
new PriorityQueue<>(Comparator.comparingInt(o -> o.getPriority().getPriorityKey()));
4172
private final LinkedList<PollFuture> waiters = new LinkedList<>();
4273

4374
/**
44-
* Adds the provided element to the tail of this queue.
75+
* Adds the provided element to the queue at the default priority.
4576
*
4677
* @param element the value to add
4778
*/
@@ -51,7 +82,22 @@ synchronized void add(E element) {
5182
return;
5283
}
5384
}
54-
backlog.push(element);
85+
backlog.add(new TaskQueueElement(element));
86+
}
87+
88+
/**
89+
* Adds the provided element to the queue at the given priority.
90+
*
91+
* @param element the value to add
92+
* @param priority the priority of the element
93+
*/
94+
synchronized void add(E element, Priority priority) {
95+
for (PollFuture future = waiters.poll(); future != null; future = waiters.pop()) {
96+
if (future.set(element)) {
97+
return;
98+
}
99+
}
100+
backlog.add(new TaskQueueElement(element, priority));
55101
}
56102

57103
/**
@@ -69,7 +115,7 @@ synchronized Future<E> poll() {
69115
waiters.push(future);
70116
return future;
71117
}
72-
element = backlog.pop();
118+
element = backlog.poll().getValue();
73119
}
74120
future.set(element);
75121
return future;

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,8 @@ public void completeWorkflowTask(
613613
stickyExecutionAttributes == null
614614
? startRequest.getTaskQueue().getName()
615615
: stickyExecutionAttributes.getWorkerTaskQueue().getName());
616-
store.sendQueryTask(executionId, taskQueueId, task);
616+
store.sendQueryTask(
617+
executionId, taskQueueId, task, getStartRequest().getPriority());
617618
this.queries.put(queryId.getQueryId(), consistent.getResult());
618619
}
619620
}
@@ -2904,7 +2905,7 @@ private QueryWorkflowResponse directQuery(QueryWorkflowRequest queryRequest, lon
29042905
? startRequest.getTaskQueue().getName()
29052906
: stickyExecutionAttributes.getWorkerTaskQueue().getName());
29062907
queries.put(queryId.getQueryId(), result);
2907-
store.sendQueryTask(executionId, taskQueueId, task);
2908+
store.sendQueryTask(executionId, taskQueueId, task, getStartRequest().getPriority());
29082909
} finally {
29092910
lock.unlock(); // locked in the query method
29102911
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowStore.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.google.protobuf.Timestamp;
2424
import io.grpc.Deadline;
25+
import io.temporal.api.common.v1.Priority;
2526
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
2627
import io.temporal.api.workflowservice.v1.*;
2728
import java.time.Duration;
@@ -183,7 +184,10 @@ Future<PollActivityTaskQueueResponse.Builder> pollActivityTaskQueue(
183184
Future<NexusTask> pollNexusTaskQueue(PollNexusTaskQueueRequest pollRequest);
184185

185186
void sendQueryTask(
186-
ExecutionId executionId, TaskQueueId taskQueue, PollWorkflowTaskQueueResponse.Builder task);
187+
ExecutionId executionId,
188+
TaskQueueId taskQueue,
189+
PollWorkflowTaskQueueResponse.Builder task,
190+
Priority priority);
187191

188192
GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
189193
ExecutionId executionId,

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.protobuf.util.Timestamps;
2727
import io.grpc.Deadline;
2828
import io.grpc.Status;
29+
import io.temporal.api.common.v1.Priority;
2930
import io.temporal.api.common.v1.WorkflowExecution;
3031
import io.temporal.api.enums.v1.EventType;
3132
import io.temporal.api.enums.v1.HistoryEventFilterType;
@@ -217,15 +218,16 @@ public long save(RequestContext ctx) {
217218
}
218219
TaskQueue<PollWorkflowTaskQueueResponse.Builder> workflowTaskQueue =
219220
getWorkflowTaskQueueQueue(id);
220-
workflowTaskQueue.add(workflowTask.getTask());
221+
workflowTaskQueue.add(
222+
workflowTask.getTask(), ctx.getWorkflowMutableState().getStartRequest().getPriority());
221223
}
222224

223225
List<ActivityTask> activityTasks = ctx.getActivityTasks();
224226
if (activityTasks != null) {
225227
for (ActivityTask activityTask : activityTasks) {
226228
TaskQueue<PollActivityTaskQueueResponse.Builder> activityTaskQueue =
227229
getActivityTaskQueueQueue(activityTask.getTaskQueueId());
228-
activityTaskQueue.add(activityTask.getTask());
230+
activityTaskQueue.add(activityTask.getTask(), activityTask.getTask().getPriority());
229231
}
230232
}
231233

@@ -347,7 +349,10 @@ public Future<NexusTask> pollNexusTaskQueue(PollNexusTaskQueueRequest pollReques
347349

348350
@Override
349351
public void sendQueryTask(
350-
ExecutionId executionId, TaskQueueId taskQueue, PollWorkflowTaskQueueResponse.Builder task) {
352+
ExecutionId executionId,
353+
TaskQueueId taskQueue,
354+
PollWorkflowTaskQueueResponse.Builder task,
355+
Priority priority) {
351356
lock.lock();
352357
try {
353358
HistoryStore historyStore = getHistoryStore(executionId);
@@ -385,7 +390,7 @@ public void sendQueryTask(
385390
}
386391
TaskQueue<PollWorkflowTaskQueueResponse.Builder> workflowTaskQueue =
387392
getWorkflowTaskQueueQueue(taskQueue);
388-
workflowTaskQueue.add(task);
393+
workflowTaskQueue.add(task, priority);
389394
}
390395

391396
@Override

0 commit comments

Comments
 (0)