Skip to content

Commit 71259cb

Browse files
authored
Cron child workflow (#264)
* Add cron schedule to child workflow option * Fix unit test * Review comments
1 parent b06190d commit 71259cb

11 files changed

+180
-76
lines changed

src/main/java/com/uber/cadence/internal/replay/StartChildWorkflowExecutionParameters.java

+24-4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public static final class Builder {
5050

5151
private RetryParameters retryParameters;
5252

53+
private String cronSchedule;
54+
5355
public Builder setDomain(String domain) {
5456
this.domain = domain;
5557
return this;
@@ -106,6 +108,11 @@ public Builder setRetryParameters(RetryParameters retryParameters) {
106108
return this;
107109
}
108110

111+
public Builder setCronSchedule(String cronSchedule) {
112+
this.cronSchedule = cronSchedule;
113+
return this;
114+
}
115+
109116
public StartChildWorkflowExecutionParameters build() {
110117
return new StartChildWorkflowExecutionParameters(
111118
domain,
@@ -118,7 +125,8 @@ public StartChildWorkflowExecutionParameters build() {
118125
workflowType,
119126
childPolicy,
120127
workflowIdReusePolicy,
121-
retryParameters);
128+
retryParameters,
129+
cronSchedule);
122130
}
123131
}
124132

@@ -144,6 +152,8 @@ public StartChildWorkflowExecutionParameters build() {
144152

145153
private final RetryParameters retryParameters;
146154

155+
private final String cronSchedule;
156+
147157
private StartChildWorkflowExecutionParameters(
148158
String domain,
149159
byte[] input,
@@ -155,7 +165,8 @@ private StartChildWorkflowExecutionParameters(
155165
WorkflowType workflowType,
156166
ChildPolicy childPolicy,
157167
WorkflowIdReusePolicy workflowIdReusePolicy,
158-
RetryParameters retryParameters) {
168+
RetryParameters retryParameters,
169+
String cronSchedule) {
159170
this.domain = domain;
160171
this.input = input;
161172
this.control = control;
@@ -167,6 +178,7 @@ private StartChildWorkflowExecutionParameters(
167178
this.childPolicy = childPolicy;
168179
this.workflowIdReusePolicy = workflowIdReusePolicy;
169180
this.retryParameters = retryParameters;
181+
this.cronSchedule = cronSchedule;
170182
}
171183

172184
public String getDomain() {
@@ -213,6 +225,10 @@ public RetryParameters getRetryParameters() {
213225
return retryParameters;
214226
}
215227

228+
public String getCronSchedule() {
229+
return cronSchedule;
230+
}
231+
216232
@Override
217233
public boolean equals(Object o) {
218234
if (this == o) return true;
@@ -228,7 +244,8 @@ public boolean equals(Object o) {
228244
&& Objects.equals(workflowType, that.workflowType)
229245
&& childPolicy == that.childPolicy
230246
&& workflowIdReusePolicy == that.workflowIdReusePolicy
231-
&& Objects.equals(retryParameters, that.retryParameters);
247+
&& Objects.equals(retryParameters, that.retryParameters)
248+
&& Objects.equals(cronSchedule, that.cronSchedule);
232249
}
233250

234251
@Override
@@ -244,7 +261,8 @@ public int hashCode() {
244261
workflowType,
245262
childPolicy,
246263
workflowIdReusePolicy,
247-
retryParameters);
264+
retryParameters,
265+
cronSchedule);
248266
result = 31 * result + Arrays.hashCode(input);
249267
return result;
250268
}
@@ -278,6 +296,8 @@ public String toString() {
278296
+ workflowIdReusePolicy
279297
+ ", retryParameters="
280298
+ retryParameters
299+
+ ", cronSchedule="
300+
+ cronSchedule
281301
+ '}';
282302
}
283303
}

src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import com.google.common.base.Strings;
2021
import com.uber.cadence.ChildPolicy;
2122
import com.uber.cadence.ChildWorkflowExecutionCanceledEventAttributes;
2223
import com.uber.cadence.ChildWorkflowExecutionCompletedEventAttributes;
@@ -147,6 +148,11 @@ Consumer<Exception> startChildWorkflow(
147148
if (retryParameters != null) {
148149
attributes.setRetryPolicy(retryParameters.toRetryPolicy());
149150
}
151+
152+
if (!Strings.isNullOrEmpty(parameters.getCronSchedule())) {
153+
attributes.setCronSchedule(parameters.getCronSchedule());
154+
}
155+
150156
long initiatedEventId = decisions.startChildWorkflowExecution(attributes);
151157
final OpenChildWorkflowRequestInfo context =
152158
new OpenChildWorkflowRequestInfo(executionCallback);

src/main/java/com/uber/cadence/internal/sync/ChildWorkflowInvocationHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.uber.cadence.internal.common.InternalUtils.getWorkflowMethod;
2222
import static com.uber.cadence.internal.common.InternalUtils.getWorkflowType;
2323

24+
import com.uber.cadence.common.CronSchedule;
2425
import com.uber.cadence.common.MethodRetry;
2526
import com.uber.cadence.internal.common.InternalUtils;
2627
import com.uber.cadence.workflow.ChildWorkflowOptions;
@@ -45,9 +46,10 @@ class ChildWorkflowInvocationHandler implements InvocationHandler {
4546
WorkflowMethod workflowAnnotation = workflowMethod.getAnnotation(WorkflowMethod.class);
4647
String workflowType = getWorkflowType(workflowMethod, workflowAnnotation);
4748
MethodRetry retryAnnotation = workflowMethod.getAnnotation(MethodRetry.class);
49+
CronSchedule cronSchedule = workflowMethod.getAnnotation(CronSchedule.class);
4850

4951
ChildWorkflowOptions merged =
50-
ChildWorkflowOptions.merge(workflowAnnotation, retryAnnotation, options);
52+
ChildWorkflowOptions.merge(workflowAnnotation, retryAnnotation, cronSchedule, options);
5153
this.stub = new ChildWorkflowStubImpl(workflowType, merged, decisionContext);
5254
}
5355

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,6 @@ public void addActivityImplementation(Object activity) {
107107

108108
private ActivityTaskHandler.Result mapToActivityFailure(
109109
String activityType, Throwable failure, Scope metricsScope) {
110-
if (failure instanceof Error) {
111-
Map<String, String> tags =
112-
new ImmutableMap.Builder<String, String>(1)
113-
.put(MetricsTag.ACTIVITY_TYPE, activityType)
114-
.build();
115-
metricsScope.tagged(tags).counter(MetricsType.ACTIVITY_TASK_ERROR_COUNTER).inc(1);
116-
throw (Error) failure;
117-
}
118110

119111
if (failure instanceof ActivityCancelledException) {
120112
throw new CancellationException(failure.getMessage());
@@ -129,7 +121,16 @@ private ActivityTaskHandler.Result mapToActivityFailure(
129121
dataConverter.toData(timeoutException.getDetails()));
130122
}
131123

132-
metricsScope.counter(MetricsType.ACTIVITY_EXEC_FAILED_COUNTER).inc(1);
124+
Map<String, String> activityTypeTag =
125+
new ImmutableMap.Builder<String, String>(1)
126+
.put(MetricsTag.ACTIVITY_TYPE, activityType)
127+
.build();
128+
if (failure instanceof Error) {
129+
metricsScope.tagged(activityTypeTag).counter(MetricsType.ACTIVITY_TASK_ERROR_COUNTER).inc(1);
130+
throw (Error) failure;
131+
}
132+
133+
metricsScope.tagged(activityTypeTag).counter(MetricsType.ACTIVITY_EXEC_FAILED_COUNTER).inc(1);
133134
RespondActivityTaskFailedRequest result = new RespondActivityTaskFailedRequest();
134135
failure = CheckedExceptionWrapper.unwrap(failure);
135136
result.setReason(failure.getClass().getName());

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

+1
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ private Promise<byte[]> executeChildWorkflowOnce(
296296
.setTaskStartToCloseTimeoutSeconds(options.getTaskStartToCloseTimeout().getSeconds())
297297
.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy())
298298
.setRetryParameters(retryParameters)
299+
.setCronSchedule(options.getCronSchedule())
299300
.build();
300301
CompletablePromise<byte[]> result = Workflow.newPromise();
301302
Consumer<Exception> cancellationCallback =

src/main/java/com/uber/cadence/internal/testservice/StateMachines.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,8 @@ private static void initiateChildWorkflow(
446446
.setWorkflowId(d.getWorkflowId())
447447
.setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
448448
.setWorkflowType(d.getWorkflowType())
449-
.setRetryPolicy(d.getRetryPolicy());
449+
.setRetryPolicy(d.getRetryPolicy())
450+
.setCronSchedule(d.getCronSchedule());
450451
HistoryEvent event =
451452
new HistoryEvent()
452453
.setEventType(EventType.StartChildWorkflowExecutionInitiated)
@@ -466,7 +467,8 @@ private static void initiateChildWorkflow(
466467
.setWorkflowId(d.getWorkflowId())
467468
.setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
468469
.setWorkflowType(d.getWorkflowType())
469-
.setRetryPolicy(d.getRetryPolicy());
470+
.setRetryPolicy(d.getRetryPolicy())
471+
.setCronSchedule(d.getCronSchedule());
470472
if (d.isSetInput()) {
471473
startChild.setInput(d.getInput());
472474
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,31 @@ private void processDecision(
344344
processRecordMarker(ctx, d.getRecordMarkerDecisionAttributes(), decisionTaskCompletedId);
345345
break;
346346
case RequestCancelExternalWorkflowExecution:
347-
throw new InternalServiceError(
348-
"Decision " + d.getDecisionType() + " is not yet " + "implemented");
347+
processRequestCancelExternalWorkflowExecution(
348+
ctx, d.getRequestCancelExternalWorkflowExecutionDecisionAttributes());
349+
break;
349350
}
350351
}
351352

353+
private void processRequestCancelExternalWorkflowExecution(
354+
RequestContext ctx, RequestCancelExternalWorkflowExecutionDecisionAttributes attr) {
355+
ForkJoinPool.commonPool()
356+
.execute(
357+
() -> {
358+
RequestCancelWorkflowExecutionRequest request =
359+
new RequestCancelWorkflowExecutionRequest();
360+
WorkflowExecution workflowExecution = new WorkflowExecution();
361+
workflowExecution.setWorkflowId(attr.workflowId);
362+
request.setWorkflowExecution(workflowExecution);
363+
request.setDomain(ctx.getDomain());
364+
try {
365+
service.RequestCancelWorkflowExecution(request);
366+
} catch (Exception e) {
367+
log.error("Failure to request cancel external workflow", e);
368+
}
369+
});
370+
}
371+
352372
private void processRecordMarker(
353373
RequestContext ctx, RecordMarkerDecisionAttributes attr, long decisionTaskCompletedId)
354374
throws BadRequestError {

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.uber.cadence.internal.testservice;
1919

2020
import com.uber.cadence.BadRequestError;
21-
import com.uber.cadence.CancellationAlreadyRequestedError;
2221
import com.uber.cadence.DeprecateDomainRequest;
2322
import com.uber.cadence.DescribeDomainRequest;
2423
import com.uber.cadence.DescribeDomainResponse;
@@ -476,8 +475,7 @@ public void RespondActivityTaskCanceledByID(
476475

477476
@Override
478477
public void RequestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest cancelRequest)
479-
throws BadRequestError, InternalServiceError, EntityNotExistsError,
480-
CancellationAlreadyRequestedError, ServiceBusyError, TException {
478+
throws TException {
481479
ExecutionId executionId =
482480
new ExecutionId(cancelRequest.getDomain(), cancelRequest.getWorkflowExecution());
483481
TestWorkflowMutableState mutableState = getMutableState(executionId);
@@ -486,8 +484,7 @@ public void RequestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest
486484

487485
@Override
488486
public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest)
489-
throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError,
490-
TException {
487+
throws TException {
491488
ExecutionId executionId =
492489
new ExecutionId(signalRequest.getDomain(), signalRequest.getWorkflowExecution());
493490
TestWorkflowMutableState mutableState = getMutableState(executionId);

0 commit comments

Comments
 (0)