Skip to content

Commit 0880afa

Browse files
vancexuLiang Mei
and
Liang Mei
committed
Add async start to untype stub (#506)
Co-authored-by: Liang Mei <[email protected]>
1 parent 3dcf53c commit 0880afa

12 files changed

+397
-49
lines changed

src/main/java/com/uber/cadence/client/WorkflowStub.java

+5
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ static <T> WorkflowStub fromTyped(T typed) {
6767

6868
WorkflowExecution start(Object... args);
6969

70+
CompletableFuture<WorkflowExecution> startAsync(Object... args);
71+
72+
CompletableFuture<WorkflowExecution> startAsyncWithTimeout(
73+
long timeout, TimeUnit unit, Object... args);
74+
7075
WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs);
7176

7277
Optional<String> getWorkflowType();

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java

+7
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,19 @@
2626
import com.uber.cadence.internal.replay.QueryWorkflowParameters;
2727
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
2828
import com.uber.cadence.serviceclient.IWorkflowService;
29+
import java.util.concurrent.CompletableFuture;
2930

3031
public interface GenericWorkflowClientExternal {
3132

3233
WorkflowExecution startWorkflow(StartWorkflowExecutionParameters startParameters)
3334
throws WorkflowExecutionAlreadyStartedError;
3435

36+
CompletableFuture<WorkflowExecution> startWorkflowAsync(
37+
StartWorkflowExecutionParameters startParameters);
38+
39+
CompletableFuture<WorkflowExecution> startWorkflowAsync(
40+
StartWorkflowExecutionParameters startParameters, Long timeoutInMillis);
41+
3542
void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters);
3643

3744
WorkflowExecution signalWithStartWorkflowExecution(

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

+98-28
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.uber.cadence.WorkflowExecution;
3535
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
3636
import com.uber.cadence.WorkflowQuery;
37+
import com.uber.cadence.common.RetryOptions;
3738
import com.uber.cadence.internal.common.*;
3839
import com.uber.cadence.internal.metrics.MetricsTag;
3940
import com.uber.cadence.internal.metrics.MetricsType;
@@ -43,10 +44,13 @@
4344
import com.uber.m3.tally.Scope;
4445
import com.uber.m3.util.ImmutableMap;
4546
import java.nio.ByteBuffer;
47+
import java.time.Duration;
4648
import java.util.HashMap;
4749
import java.util.Map;
4850
import java.util.UUID;
51+
import java.util.concurrent.CompletableFuture;
4952
import org.apache.thrift.TException;
53+
import org.apache.thrift.async.AsyncMethodCallback;
5054

5155
public final class GenericWorkflowClientExternalImpl implements GenericWorkflowClientExternal {
5256

@@ -77,19 +81,104 @@ public WorkflowExecution startWorkflow(StartWorkflowExecutionParameters startPar
7781
try {
7882
return startWorkflowInternal(startParameters);
7983
} finally {
80-
// TODO: can probably cache this
81-
Map<String, String> tags =
82-
new ImmutableMap.Builder<String, String>(3)
83-
.put(MetricsTag.WORKFLOW_TYPE, startParameters.getWorkflowType().getName())
84-
.put(MetricsTag.TASK_LIST, startParameters.getTaskList())
85-
.put(MetricsTag.DOMAIN, domain)
86-
.build();
87-
metricsScope.tagged(tags).counter(MetricsType.WORKFLOW_START_COUNTER).inc(1);
84+
emitMetricsForStartWorkflow(startParameters);
8885
}
8986
}
9087

88+
@Override
89+
public CompletableFuture<WorkflowExecution> startWorkflowAsync(
90+
StartWorkflowExecutionParameters startParameters) {
91+
92+
return startWorkflowAsync(startParameters, Long.MAX_VALUE);
93+
}
94+
95+
@Override
96+
public CompletableFuture<WorkflowExecution> startWorkflowAsync(
97+
StartWorkflowExecutionParameters startParameters, Long timeoutInMillis) {
98+
99+
emitMetricsForStartWorkflow(startParameters);
100+
return startWorkflowAsyncInternal(startParameters, timeoutInMillis);
101+
}
102+
103+
private void emitMetricsForStartWorkflow(StartWorkflowExecutionParameters startParameters) {
104+
// TODO: can probably cache this
105+
Map<String, String> tags =
106+
new ImmutableMap.Builder<String, String>(3)
107+
.put(MetricsTag.WORKFLOW_TYPE, startParameters.getWorkflowType().getName())
108+
.put(MetricsTag.TASK_LIST, startParameters.getTaskList())
109+
.put(MetricsTag.DOMAIN, domain)
110+
.build();
111+
metricsScope.tagged(tags).counter(MetricsType.WORKFLOW_START_COUNTER).inc(1);
112+
}
113+
91114
private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters startParameters)
92115
throws WorkflowExecutionAlreadyStartedError {
116+
117+
StartWorkflowExecutionRequest request = getStartRequest(startParameters);
118+
StartWorkflowExecutionResponse result;
119+
try {
120+
result =
121+
Retryer.retryWithResult(
122+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
123+
() -> service.StartWorkflowExecution(request));
124+
} catch (WorkflowExecutionAlreadyStartedError e) {
125+
throw e;
126+
} catch (TException e) {
127+
throw CheckedExceptionWrapper.wrap(e);
128+
}
129+
WorkflowExecution execution = new WorkflowExecution();
130+
execution.setRunId(result.getRunId());
131+
execution.setWorkflowId(request.getWorkflowId());
132+
133+
return execution;
134+
}
135+
136+
private RetryOptions getRetryOptionsWithExpiration(RetryOptions o, Long timeoutInMillis) {
137+
if (timeoutInMillis == null || timeoutInMillis <= 0 || timeoutInMillis == Long.MAX_VALUE) {
138+
return o;
139+
}
140+
return new RetryOptions.Builder(Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS)
141+
.setExpiration(Duration.ofMillis((timeoutInMillis)))
142+
.build();
143+
}
144+
145+
private CompletableFuture<WorkflowExecution> startWorkflowAsyncInternal(
146+
StartWorkflowExecutionParameters startParameters, Long timeoutInMillis) {
147+
StartWorkflowExecutionRequest request = getStartRequest(startParameters);
148+
149+
return Retryer.retryWithResultAsync(
150+
getRetryOptionsWithExpiration(
151+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis),
152+
() -> {
153+
CompletableFuture<WorkflowExecution> result = new CompletableFuture<>();
154+
try {
155+
156+
service.StartWorkflowExecutionWithTimeout(
157+
request,
158+
new AsyncMethodCallback<StartWorkflowExecutionResponse>() {
159+
@Override
160+
public void onComplete(StartWorkflowExecutionResponse response) {
161+
WorkflowExecution execution = new WorkflowExecution();
162+
execution.setRunId(response.getRunId());
163+
execution.setWorkflowId(request.getWorkflowId());
164+
result.complete(execution);
165+
}
166+
167+
@Override
168+
public void onError(Exception exception) {
169+
result.completeExceptionally(exception);
170+
}
171+
},
172+
timeoutInMillis);
173+
} catch (TException e) {
174+
result.completeExceptionally(e);
175+
}
176+
return result;
177+
});
178+
}
179+
180+
private StartWorkflowExecutionRequest getStartRequest(
181+
StartWorkflowExecutionParameters startParameters) {
93182
StartWorkflowExecutionRequest request = new StartWorkflowExecutionRequest();
94183
request.setDomain(domain);
95184
if (startParameters.getInput() != null) {
@@ -124,26 +213,7 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters
124213
request.setSearchAttributes(toSearchAttributesThrift(startParameters.getSearchAttributes()));
125214
request.setHeader(toHeaderThrift(startParameters.getContext()));
126215

127-
// if(startParameters.getChildPolicy() != null) {
128-
// request.setChildPolicy(startParameters.getChildPolicy());
129-
// }
130-
131-
StartWorkflowExecutionResponse result;
132-
try {
133-
result =
134-
Retryer.retryWithResult(
135-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
136-
() -> service.StartWorkflowExecution(request));
137-
} catch (WorkflowExecutionAlreadyStartedError e) {
138-
throw e;
139-
} catch (TException e) {
140-
throw CheckedExceptionWrapper.wrap(e);
141-
}
142-
WorkflowExecution execution = new WorkflowExecution();
143-
execution.setRunId(result.getRunId());
144-
execution.setWorkflowId(request.getWorkflowId());
145-
146-
return execution;
216+
return request;
147217
}
148218

149219
private Memo toMemoThrift(Map<String, byte[]> memo) {

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

+9
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,15 @@ public void StartWorkflowExecution(
576576
impl.StartWorkflowExecution(startRequest, resultHandler);
577577
}
578578

579+
@Override
580+
public void StartWorkflowExecutionWithTimeout(
581+
StartWorkflowExecutionRequest startRequest,
582+
AsyncMethodCallback resultHandler,
583+
Long timeoutInMillis)
584+
throws TException {
585+
impl.StartWorkflowExecutionWithTimeout(startRequest, resultHandler, timeoutInMillis);
586+
}
587+
579588
@Override
580589
public void GetWorkflowExecutionHistory(
581590
GetWorkflowExecutionHistoryRequest getRequest, AsyncMethodCallback resultHandler)

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

+20
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,15 @@ public void StartWorkflowExecution(
427427
impl.StartWorkflowExecution(startRequest, resultHandler);
428428
}
429429

430+
@Override
431+
public void StartWorkflowExecutionWithTimeout(
432+
StartWorkflowExecutionRequest startRequest,
433+
AsyncMethodCallback resultHandler,
434+
Long timeoutInMillis)
435+
throws TException {
436+
impl.StartWorkflowExecutionWithTimeout(startRequest, resultHandler, timeoutInMillis);
437+
}
438+
430439
@Override
431440
public void GetWorkflowExecutionHistory(
432441
GetWorkflowExecutionHistoryRequest getRequest, AsyncMethodCallback resultHandler)
@@ -802,6 +811,17 @@ public WorkflowExecution start(Object... args) {
802811
return next.start(args);
803812
}
804813

814+
@Override
815+
public CompletableFuture<WorkflowExecution> startAsync(Object... args) {
816+
return next.startAsync(args);
817+
}
818+
819+
@Override
820+
public CompletableFuture<WorkflowExecution> startAsyncWithTimeout(
821+
long timeout, TimeUnit unit, Object... args) {
822+
return next.startAsyncWithTimeout(timeout, unit, args);
823+
}
824+
805825
@Override
806826
public WorkflowExecution signalWithStart(
807827
String signalName, Object[] signalArgs, Object[] startArgs) {

src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java

+33
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ private WorkflowExecution startWithOptions(WorkflowOptions o, Object... args) {
128128
return execution.get();
129129
}
130130

131+
private CompletableFuture<WorkflowExecution> startAsyncWithOptions(
132+
long timeout, TimeUnit unit, WorkflowOptions o, Object... args) {
133+
StartWorkflowExecutionParameters p = getStartWorkflowExecutionParameters(o, args);
134+
return genericClient.startWorkflowAsync(p, unit.toMillis(timeout));
135+
}
136+
131137
private StartWorkflowExecutionParameters getStartWorkflowExecutionParameters(
132138
WorkflowOptions o, Object[] args) {
133139
if (execution.get() != null) {
@@ -195,6 +201,33 @@ public WorkflowExecution start(Object... args) {
195201
return startWithOptions(WorkflowOptions.merge(null, null, null, options.get()), args);
196202
}
197203

204+
@Override
205+
public CompletableFuture<WorkflowExecution> startAsync(Object... args) {
206+
return startAsyncWithTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS, args);
207+
}
208+
209+
@Override
210+
public CompletableFuture<WorkflowExecution> startAsyncWithTimeout(
211+
long timeout, TimeUnit unit, Object... args) {
212+
if (!options.isPresent()) {
213+
throw new IllegalStateException("Required parameter WorkflowOptions is missing");
214+
}
215+
216+
CompletableFuture<WorkflowExecution> result =
217+
startAsyncWithOptions(
218+
timeout, unit, WorkflowOptions.merge(null, null, null, options.get()), args);
219+
result.whenComplete(
220+
(input, exception) -> {
221+
if (input != null) {
222+
execution.set(
223+
new WorkflowExecution()
224+
.setWorkflowId(input.getWorkflowId())
225+
.setRunId(input.getRunId()));
226+
}
227+
});
228+
return result;
229+
}
230+
198231
private WorkflowExecution signalWithStartWithOptions(
199232
WorkflowOptions options, String signalName, Object[] signalArgs, Object[] startArgs) {
200233
StartWorkflowExecutionParameters sp = getStartWorkflowExecutionParameters(options, startArgs);

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,24 @@ public void DeprecateDomain(
797797
public void StartWorkflowExecution(
798798
StartWorkflowExecutionRequest startRequest, AsyncMethodCallback resultHandler)
799799
throws TException {
800-
throw new UnsupportedOperationException("not implemented");
800+
StartWorkflowExecutionWithTimeout(startRequest, resultHandler, null);
801+
}
802+
803+
@Override
804+
public void StartWorkflowExecutionWithTimeout(
805+
StartWorkflowExecutionRequest startRequest,
806+
AsyncMethodCallback resultHandler,
807+
Long timeoutInMillis)
808+
throws TException {
809+
forkJoinPool.execute(
810+
() -> {
811+
try {
812+
StartWorkflowExecutionResponse result = StartWorkflowExecution(startRequest);
813+
resultHandler.onComplete(result);
814+
} catch (TException e) {
815+
resultHandler.onError(e);
816+
}
817+
});
801818
}
802819

803820
@SuppressWarnings("unchecked") // Generator ignores that AsyncMethodCallback is generic

src/main/java/com/uber/cadence/reporter/CadenceClientStatsReporter.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,14 @@ public void reportCounter(String name, Map<String, String> tags, long value) {
5959

6060
@Override
6161
public void reportGauge(String name, Map<String, String> tags, double value) {
62-
AtomicDouble gauge = gauges.computeIfAbsent(name, metricName -> {
63-
AtomicDouble result = Metrics.gauge(name, getTags(tags), new AtomicDouble());
64-
Preconditions.checkNotNull(result, "Metrics.gauge should not return null ever");
65-
return result;
66-
});
62+
AtomicDouble gauge =
63+
gauges.computeIfAbsent(
64+
name,
65+
metricName -> {
66+
AtomicDouble result = Metrics.gauge(name, getTags(tags), new AtomicDouble());
67+
Preconditions.checkNotNull(result, "Metrics.gauge should not return null ever");
68+
return result;
69+
});
6770
gauge.set(value);
6871
}
6972

@@ -99,7 +102,6 @@ private Iterable<Tag> getTags(Map<String, String> tags) {
99102
Tag.of(MetricsTag.ACTIVITY_TYPE, Strings.nullToEmpty(tags.get(MetricsTag.ACTIVITY_TYPE))),
100103
Tag.of(MetricsTag.DOMAIN, Strings.nullToEmpty(tags.get(MetricsTag.DOMAIN))),
101104
Tag.of(MetricsTag.TASK_LIST, Strings.nullToEmpty(tags.get(MetricsTag.TASK_LIST))),
102-
Tag.of(MetricsTag.WORKFLOW_TYPE, Strings.nullToEmpty(tags.get(MetricsTag.WORKFLOW_TYPE)))
103-
);
105+
Tag.of(MetricsTag.WORKFLOW_TYPE, Strings.nullToEmpty(tags.get(MetricsTag.WORKFLOW_TYPE))));
104106
}
105107
}

src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java

+16
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
2121
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
22+
import com.uber.cadence.StartWorkflowExecutionRequest;
2223
import com.uber.cadence.WorkflowService.AsyncIface;
2324
import com.uber.cadence.WorkflowService.Iface;
2425
import org.apache.thrift.TException;
@@ -27,6 +28,21 @@
2728
public interface IWorkflowService extends Iface, AsyncIface {
2829
void close();
2930

31+
/**
32+
* StartWorkflowExecutionWithTimeout start workflow same as StartWorkflowExecution but with
33+
* timeout
34+
*
35+
* @param startRequest
36+
* @param resultHandler
37+
* @param timeoutInMillis
38+
* @throws TException
39+
*/
40+
void StartWorkflowExecutionWithTimeout(
41+
StartWorkflowExecutionRequest startRequest,
42+
AsyncMethodCallback resultHandler,
43+
Long timeoutInMillis)
44+
throws TException;
45+
3046
/**
3147
* GetWorkflowExecutionHistoryWithTimeout get workflow history same as GetWorkflowExecutionHistory
3248
* but with timeout.

0 commit comments

Comments
 (0)