Skip to content

Commit cc71829

Browse files
authored
Exposing CancellationScope.run method (#261)
* Added RunnableCancellationScope * Got rid of RunnableCancellationScope * Exposed run method on CancellationScope
1 parent 8778ce7 commit cc71829

File tree

7 files changed

+67
-24
lines changed

7 files changed

+67
-24
lines changed

src/main/java/com/uber/cadence/internal/sync/CancellationScopeImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ private void setParent(CancellationScopeImpl parent) {
8787
}
8888
}
8989

90-
void run() {
90+
@Override
91+
public void run() {
9192
try {
9293
pushCurrent(this);
9394
runnable.run();

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -279,9 +279,7 @@ public static Promise<Object> promiseAnyOf(Promise<?>... promises) {
279279
}
280280

281281
public static CancellationScope newCancellationScope(boolean detached, Runnable runnable) {
282-
CancellationScopeImpl result = new CancellationScopeImpl(detached, runnable);
283-
result.run();
284-
return result;
282+
return new CancellationScopeImpl(detached, runnable);
285283
}
286284

287285
public static CancellationScopeImpl currentCancellationScope() {

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

+5
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ public void setName(String name) {
189189
runnable);
190190
}
191191

192+
@Override
193+
public void run() {
194+
throw new UnsupportedOperationException("not used");
195+
}
196+
192197
@Override
193198
public boolean isDetached() {
194199
return task.cancellationScope.isDetached();

src/main/java/com/uber/cadence/workflow/CancellationScope.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
* {@link Workflow#newDetachedCancellationScope(Runnable)}. Supports explicit cancelling of the code
2626
* a cancellation scope wraps.
2727
*/
28-
public interface CancellationScope {
28+
public interface CancellationScope extends Runnable {
2929

3030
/**
3131
* When set to false parent thread cancellation causes this one to get cancelled automatically.

src/main/java/com/uber/cadence/workflow/Workflow.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -537,10 +537,38 @@ public static WorkflowInfo getWorkflowInfo() {
537537
return WorkflowInternal.getWorkflowInfo();
538538
}
539539

540-
public static <R> CancellationScope newCancellationScope(Runnable runnable) {
540+
/**
541+
* Wraps the Runnable method argument in a {@link CancellationScope}. The {@link
542+
* CancellationScope#run()} calls {@link Runnable#run()} on the wrapped Runnable. The returned
543+
* CancellationScope can be used to cancel the wrapped code. The cancellation semantic depends on
544+
* the operation the code is blocked on. For example activity or child workflow is first cancelled
545+
* then throws a {@link CancellationException}. The same applies for {@link Workflow#sleep(long)}
546+
* operation. When an activity or a child workflow is invoked asynchronously then they get
547+
* cancelled and a {@link Promise} that contains their result will throw CancellationException
548+
* when {@link Promise#get()} is called.
549+
*
550+
* <p>The new cancellation scope is linked to the parent one (available as {@link
551+
* CancellationScope#current()}. If the parent one is cancelled then all the children scopes are
552+
* cancelled automatically. The main workflow function (annotated with @{@link WorkflowMethod} is
553+
* wrapped within a root cancellation scope which gets cancelled when a workflow is cancelled
554+
* through the Cadence CancelWorkflowExecution API. To perform cleanup operations that require
555+
* blocking after the current scope is cancelled use a scope created through {@link
556+
* #newDetachedCancellationScope(Runnable)}.
557+
*
558+
* @param runnable parameter to wrap in a cancellation scope.
559+
* @return wrapped parameter.
560+
*/
561+
public static CancellationScope newCancellationScope(Runnable runnable) {
541562
return WorkflowInternal.newCancellationScope(false, runnable);
542563
}
543564

565+
/**
566+
* Creates a CancellationScope that is not linked to a parent scope.
567+
*
568+
* @param runnable parameter to wrap in a cancellation scope.
569+
* @return wrapped parameter.
570+
* @see #newCancellationScope(Runnable)
571+
*/
544572
public static CancellationScope newDetachedCancellationScope(Runnable runnable) {
545573
return WorkflowInternal.newCancellationScope(true, runnable);
546574
}

src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java

+20-15
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ public void testExplicitScopeCancellation() throws Throwable {
344344
var.completeFrom(newTimer(300));
345345
trace.add("scope done");
346346
});
347+
scope.run();
347348
trace.add("root before cancel");
348349
scope.cancel("from root");
349350
try {
@@ -385,6 +386,7 @@ public void testExplicitDetachedScopeCancellation() throws Throwable {
385386
var.completeFrom(newTimer(300));
386387
trace.add("scope done");
387388
});
389+
scope.run();
388390
trace.add("root before cancel");
389391
scope.cancel("from root");
390392
try {
@@ -446,6 +448,7 @@ public void testExplicitThreadCancellation() throws Throwable {
446448
trace.add("thread done: " + cancellation.get());
447449
});
448450
});
451+
scope.run();
449452
trace.add("root before cancel");
450453
scope.cancel("from root");
451454
threadDone.get();
@@ -475,21 +478,23 @@ public void testDetachedCancellation() throws Throwable {
475478
trace.add("root started");
476479
CompletablePromise<Void> done = Workflow.newPromise();
477480
Workflow.newDetachedCancellationScope(
478-
() -> {
479-
Async.procedure(
480-
() -> {
481-
trace.add("thread started");
482-
WorkflowThread.await(
483-
"reason1",
484-
() -> unblock1 || CancellationScope.current().isCancelRequested());
485-
if (CancellationScope.current().isCancelRequested()) {
486-
done.completeExceptionally(new CancellationException());
487-
} else {
488-
done.complete(null);
489-
}
490-
trace.add("await done");
491-
});
492-
});
481+
() -> {
482+
Async.procedure(
483+
() -> {
484+
trace.add("thread started");
485+
WorkflowThread.await(
486+
"reason1",
487+
() ->
488+
unblock1 || CancellationScope.current().isCancelRequested());
489+
if (CancellationScope.current().isCancelRequested()) {
490+
done.completeExceptionally(new CancellationException());
491+
} else {
492+
done.complete(null);
493+
}
494+
trace.add("await done");
495+
});
496+
})
497+
.run();
493498
try {
494499
done.get();
495500
} catch (CancellationException e) {

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -867,21 +867,26 @@ public String execute(String taskList) {
867867
testActivities.activityWithDelay(100000, true);
868868
fail("unreachable");
869869
} catch (CancellationException e) {
870-
Workflow.newDetachedCancellationScope(() -> assertEquals(1, testActivities.activity1(1)));
870+
Workflow.newDetachedCancellationScope(() -> assertEquals(1, testActivities.activity1(1)))
871+
.run();
871872
}
872873
try {
873874
Workflow.sleep(Duration.ofHours(1));
874875
fail("unreachable");
875876
} catch (CancellationException e) {
876877
Workflow.newDetachedCancellationScope(
877-
() -> assertEquals("a12", testActivities.activity2("a1", 2)));
878+
() -> assertEquals("a12", testActivities.activity2("a1", 2)))
879+
.run();
880+
;
878881
}
879882
try {
880883
Workflow.newTimer(Duration.ofHours(1)).get();
881884
fail("unreachable");
882885
} catch (CancellationException e) {
883886
Workflow.newDetachedCancellationScope(
884-
() -> assertEquals("a123", testActivities.activity3("a1", 2, 3)));
887+
() -> assertEquals("a123", testActivities.activity3("a1", 2, 3)))
888+
.run();
889+
;
885890
}
886891
return "result";
887892
}
@@ -2680,6 +2685,7 @@ public String execute(String taskList) {
26802685
CancellationScope scope =
26812686
Workflow.newCancellationScope(
26822687
() -> signal.completeFrom(Async.procedure(workflow::signal1, "World")));
2688+
scope.run();
26832689
scope.cancel();
26842690
try {
26852691
signal.get();

0 commit comments

Comments
 (0)