Skip to content

Commit fe8949a

Browse files
authored
Added support for SignalWithStart Service API (#253)
SignalWithStart allows signaling a workflow creating it if it doesn't exist atomically. In Java to support strongly typed workflow interfaces the batch object is used. Here is an example API usage: ``` QueryableWorkflow client = workflowClient.newWorkflowStub(QueryableWorkflow.class, options); BatchRequest batch = workflowClient.newSignalWithStartRequest(); batch.add(client::mySignal, "Hello "); // mySignal is annotated with @SignalMethod batch.add(client::execute); // execute is annotated with @WorkflowMethod WorkflowExecution execution = workflowClient.signalWithStart(batch); ``` Also errorprone requested removal of all unused variables.
1 parent 373ada0 commit fe8949a

37 files changed

+974
-241
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.client;
19+
20+
import com.uber.cadence.workflow.Functions;
21+
22+
/** Used to accumulate multiple operations */
23+
public interface BatchRequest {
24+
25+
/**
26+
* Executes zero argument request with void return type
27+
*
28+
* @param request The only supported value is method reference to a proxy created through {@link
29+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
30+
*/
31+
void add(Functions.Proc request);
32+
33+
/**
34+
* Executes one argument request with void return type
35+
*
36+
* @param request The only supported value is method reference to a proxy created through {@link
37+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
38+
* @param arg1 first request function parameter
39+
*/
40+
<A1> void add(Functions.Proc1<A1> request, A1 arg1);
41+
42+
/**
43+
* Executes two argument request with void return type
44+
*
45+
* @param request The only supported value is method reference to a proxy created through {@link
46+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
47+
* @param arg1 first request function parameter
48+
* @param arg2 second request function parameter
49+
*/
50+
<A1, A2> void add(Functions.Proc2<A1, A2> request, A1 arg1, A2 arg2);
51+
52+
/**
53+
* Executes three argument request with void return type
54+
*
55+
* @param request The only supported value is method reference to a proxy created through {@link
56+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
57+
* @param arg1 first request function parameter
58+
* @param arg2 second request function parameter
59+
* @param arg3 third request function parameter
60+
*/
61+
<A1, A2, A3> void add(Functions.Proc3<A1, A2, A3> request, A1 arg1, A2 arg2, A3 arg3);
62+
63+
/**
64+
* Executes four argument request with void return type
65+
*
66+
* @param request The only supported value is method reference to a proxy created through {@link
67+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
68+
* @param arg1 first request function parameter
69+
* @param arg2 second request function parameter
70+
* @param arg3 third request function parameter
71+
* @param arg4 fourth request function parameter
72+
*/
73+
<A1, A2, A3, A4> void add(
74+
Functions.Proc4<A1, A2, A3, A4> request, A1 arg1, A2 arg2, A3 arg3, A4 arg4);
75+
76+
/**
77+
* Executes five argument request with void return type
78+
*
79+
* @param request The only supported value is method reference to a proxy created through {@link
80+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
81+
* @param arg1 first request function parameter
82+
* @param arg2 second request function parameter
83+
* @param arg3 third request function parameter
84+
* @param arg4 fourth request function parameter
85+
* @param arg5 fifth request function parameter
86+
*/
87+
<A1, A2, A3, A4, A5> void add(
88+
Functions.Proc5<A1, A2, A3, A4, A5> request, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5);
89+
90+
/**
91+
* Executes six argument request with void return type
92+
*
93+
* @param request The only supported value is method reference to a proxy created through {@link
94+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
95+
* @param arg1 first request function parameter
96+
* @param arg2 second request function parameter
97+
* @param arg3 third request function parameter
98+
* @param arg4 fourth request function parameter
99+
* @param arg5 sixth request function parameter
100+
* @param arg6 sixth request function parameter
101+
*/
102+
<A1, A2, A3, A4, A5, A6> void add(
103+
Functions.Proc6<A1, A2, A3, A4, A5, A6> request,
104+
A1 arg1,
105+
A2 arg2,
106+
A3 arg3,
107+
A4 arg4,
108+
A5 arg5,
109+
A6 arg6);
110+
111+
/**
112+
* Executes zero argument request.
113+
*
114+
* @param request The only supported value is method reference to a proxy created through {@link
115+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
116+
*/
117+
void add(Functions.Func<?> request);
118+
119+
/**
120+
* Executes one argument request asynchronously.
121+
*
122+
* @param request The only supported value is method reference to a proxy created through {@link
123+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
124+
* @param arg1 first request argument
125+
*/
126+
<A1> void add(Functions.Func1<A1, ?> request, A1 arg1);
127+
128+
/**
129+
* Executes two argument request asynchronously.
130+
*
131+
* @param request The only supported value is method reference to a proxy created through {@link
132+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
133+
* @param arg1 first request function parameter
134+
* @param arg2 second request function parameter
135+
*/
136+
<A1, A2> void add(Functions.Func2<A1, A2, ?> request, A1 arg1, A2 arg2);
137+
138+
/**
139+
* Executes three argument request asynchronously.
140+
*
141+
* @param request The only supported value is method reference to a proxy created through {@link
142+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
143+
* @param arg1 first request function parameter
144+
* @param arg2 second request function parameter
145+
* @param arg3 third request function parameter
146+
*/
147+
<A1, A2, A3> void add(Functions.Func3<A1, A2, A3, ?> request, A1 arg1, A2 arg2, A3 arg3);
148+
149+
/**
150+
* Executes four argument request asynchronously.
151+
*
152+
* @param request The only supported value is method reference to a proxy created through {@link
153+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
154+
* @param arg1 first request function parameter
155+
* @param arg2 second request function parameter
156+
* @param arg3 third request function parameter
157+
* @param arg4 fourth request function parameter
158+
*/
159+
<A1, A2, A3, A4> void add(
160+
Functions.Func4<A1, A2, A3, A4, ?> request, A1 arg1, A2 arg2, A3 arg3, A4 arg4);
161+
162+
/**
163+
* Executes five argument request asynchronously.
164+
*
165+
* @param request The only supported value is method reference to a proxy created through {@link
166+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
167+
* @param arg1 first request function parameter
168+
* @param arg2 second request function parameter
169+
* @param arg3 third request function parameter
170+
* @param arg4 fourth request function parameter
171+
* @param arg5 sixth request function parameter
172+
*/
173+
<A1, A2, A3, A4, A5> void add(
174+
Functions.Func5<A1, A2, A3, A4, A5, ?> request, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5);
175+
176+
/**
177+
* Executes six argument request asynchronously.
178+
*
179+
* @param request The only supported value is method reference to a proxy created through {@link
180+
* WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}.
181+
* @param arg1 first request argument
182+
* @param arg2 second request function parameter
183+
* @param arg3 third request function parameter
184+
* @param arg4 fourth request function parameter
185+
* @param arg5 sixth request function parameter
186+
* @param arg6 sixth request function parameter
187+
*/
188+
<A1, A2, A3, A4, A5, A6> void add(
189+
Functions.Func6<A1, A2, A3, A4, A5, A6, ?> request,
190+
A1 arg1,
191+
A2 arg2,
192+
A3 arg3,
193+
A4 arg4,
194+
A5 arg5,
195+
A6 arg6);
196+
}

src/main/java/com/uber/cadence/client/WorkflowClient.java

+29-12
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,23 @@ WorkflowStub newUntypedWorkflowStub(
268268
*/
269269
ActivityCompletionClient newActivityCompletionClient();
270270

271+
/**
272+
* Creates BatchRequest that can be used to signal an existing workflow or start a new one if not
273+
* running. The batch before invocation must contain exactly two operations. One annotated
274+
* with @WorkflowMethod and another with @SignalMethod.
275+
*
276+
* @return batch request used to call {@link #signalWithStart(BatchRequest)}
277+
*/
278+
BatchRequest newSignalWithStartRequest();
279+
280+
/**
281+
* Invoke SignalWithStart operation.
282+
*
283+
* @param signalWithStartBatch Must be created with {@link #newSignalWithStartRequest()}
284+
* @return workflowID and runId of the signaled or started workflow.
285+
*/
286+
WorkflowExecution signalWithStart(BatchRequest signalWithStartBatch);
287+
271288
/**
272289
* Executes zero argument workflow with void return type
273290
*
@@ -336,7 +353,7 @@ static <A1, A2, A3, A4> WorkflowExecution start(
336353
}
337354

338355
/**
339-
* Executes zero argument workflow with void return type
356+
* Executes five argument workflow with void return type
340357
*
341358
* @param workflow The only supported value is method reference to a proxy created through {@link
342359
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -353,7 +370,7 @@ static <A1, A2, A3, A4, A5> WorkflowExecution start(
353370
}
354371

355372
/**
356-
* Executes zero argument workflow with void return type
373+
* Executes six argument workflow with void return type
357374
*
358375
* @param workflow The only supported value is method reference to a proxy created through {@link
359376
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -414,7 +431,7 @@ static <A1, A2, R> WorkflowExecution start(
414431
}
415432

416433
/**
417-
* Executes two argument workflow asynchronously.
434+
* Executes three argument workflow asynchronously.
418435
*
419436
* @param workflow The only supported value is method reference to a proxy created through {@link
420437
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -429,7 +446,7 @@ static <A1, A2, A3, R> WorkflowExecution start(
429446
}
430447

431448
/**
432-
* Executes two argument workflow asynchronously.
449+
* Executes four argument workflow asynchronously.
433450
*
434451
* @param workflow The only supported value is method reference to a proxy created through {@link
435452
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -445,7 +462,7 @@ static <A1, A2, A3, A4, R> WorkflowExecution start(
445462
}
446463

447464
/**
448-
* Executes two argument workflow asynchronously.
465+
* Executes five argument workflow asynchronously.
449466
*
450467
* @param workflow The only supported value is method reference to a proxy created through {@link
451468
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -467,7 +484,7 @@ static <A1, A2, A3, A4, A5, R> WorkflowExecution start(
467484
}
468485

469486
/**
470-
* Executes two argument workflow asynchronously.
487+
* Executes six argument workflow asynchronously.
471488
*
472489
* @param workflow The only supported value is method reference to a proxy created through {@link
473490
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -558,7 +575,7 @@ static <A1, A2, A3, A4> CompletableFuture<Void> execute(
558575
}
559576

560577
/**
561-
* Executes zero argument workflow with void return type
578+
* Executes five argument workflow with void return type
562579
*
563580
* @param workflow The only supported value is method reference to a proxy created through {@link
564581
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -575,7 +592,7 @@ static <A1, A2, A3, A4, A5> CompletableFuture<Void> execute(
575592
}
576593

577594
/**
578-
* Executes zero argument workflow with void return type
595+
* Executes six argument workflow with void return type
579596
*
580597
* @param workflow The only supported value is method reference to a proxy created through {@link
581598
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -636,7 +653,7 @@ static <A1, A2, R> CompletableFuture<R> execute(
636653
}
637654

638655
/**
639-
* Executes two argument workflow asynchronously.
656+
* Executes three argument workflow asynchronously.
640657
*
641658
* @param workflow The only supported value is method reference to a proxy created through {@link
642659
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -651,7 +668,7 @@ static <A1, A2, A3, R> CompletableFuture<R> execute(
651668
}
652669

653670
/**
654-
* Executes two argument workflow asynchronously.
671+
* Executes four argument workflow asynchronously.
655672
*
656673
* @param workflow The only supported value is method reference to a proxy created through {@link
657674
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -667,7 +684,7 @@ static <A1, A2, A3, A4, R> CompletableFuture<R> execute(
667684
}
668685

669686
/**
670-
* Executes two argument workflow asynchronously.
687+
* Executes five argument workflow asynchronously.
671688
*
672689
* @param workflow The only supported value is method reference to a proxy created through {@link
673690
* #newWorkflowStub(Class, WorkflowOptions)}.
@@ -689,7 +706,7 @@ static <A1, A2, A3, A4, A5, R> CompletableFuture<R> execute(
689706
}
690707

691708
/**
692-
* Executes two argument workflow asynchronously.
709+
* Executes six argument workflow asynchronously.
693710
*
694711
* @param workflow The only supported value is method reference to a proxy created through {@link
695712
* #newWorkflowStub(Class, WorkflowOptions)}.

src/main/java/com/uber/cadence/client/WorkflowStub.java

+21
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.TimeoutException;
26+
import java.util.function.Supplier;
2627

2728
/**
2829
* WorkflowStub is a client side stub to a single workflow instance. It can be used to start,
@@ -32,10 +33,30 @@
3233
*/
3334
public interface WorkflowStub {
3435

36+
/**
37+
* Extracts untyped WorkflowStub from a typed workflow stub created through {@link
38+
* WorkflowClient#newWorkflowStub(Class)}.
39+
*
40+
* @param typed typed workflow stub
41+
* @param <T> type of the workflow stub interface
42+
* @return untyped workflow stub for the same workflow instance.
43+
*/
44+
static <T> WorkflowStub fromTyped(T typed) {
45+
if (!(typed instanceof Supplier)) {
46+
throw new IllegalArgumentException(
47+
"arguments must be created through WorkflowClient.newWorkflowStub");
48+
}
49+
@SuppressWarnings("unchecked")
50+
Supplier<WorkflowStub> supplier = (Supplier<WorkflowStub>) typed;
51+
return supplier.get();
52+
}
53+
3554
void signal(String signalName, Object... args);
3655

3756
WorkflowExecution start(Object... args);
3857

58+
WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs);
59+
3960
Optional<String> getWorkflowType();
4061

4162
WorkflowExecution getExecution();

src/main/java/com/uber/cadence/common/RetryOptions.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ public void validate() {
300300
throw new IllegalArgumentException(
301301
"both MaximumAttempts and Expiration on retry policy are not set, at least one of them must be set");
302302
}
303-
if (maximumInterval != null && maximumInterval.compareTo(initialInterval) == -1) {
303+
if (maximumInterval != null && maximumInterval.compareTo(initialInterval) < 0) {
304304
throw new IllegalStateException(
305305
"maximumInterval("
306306
+ maximumInterval

src/main/java/com/uber/cadence/converter/JsonDataConverter.java

-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ public final class JsonDataConverter implements DataConverter {
7373
"com.uber.cadence.internal.sync.POJODecisionTaskHandler$POJOWorkflowImplementation.execute");
7474

7575
private static final DataConverter INSTANCE = new JsonDataConverter();
76-
private static final byte[] EMPTY_BLOB = new byte[0];
7776
private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
7877
public static final String TYPE_FIELD_NAME = "type";
7978
public static final String JSON_CONVERTER_TYPE = "JSON";

src/main/java/com/uber/cadence/internal/common/AsyncBackoffThrottler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public CompletableFuture<Void> throttle() {
105105
}
106106
CompletableFuture<Void> result = new CompletableFuture<>();
107107
long delay = calculateSleepTime();
108-
@SuppressWarnings("FutureReturnValueIgnored")
108+
@SuppressWarnings({"FutureReturnValueIgnored", "unused"})
109109
ScheduledFuture<?> ignored =
110110
executor.schedule(() -> result.complete(null), delay, TimeUnit.MILLISECONDS);
111111
return result;

0 commit comments

Comments
 (0)