Skip to content

Commit 9c81a10

Browse files
Throw clear error if workflow stub is misused
1 parent ed2b8cc commit 9c81a10

6 files changed

Lines changed: 363 additions & 0 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityStubImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ final class ActivityStubImpl extends ActivityStubBase {
1313
private final WorkflowOutboundCallsInterceptor activityExecutor;
1414
private final Functions.Proc assertReadOnly;
1515

16+
private void assertSameWorkflow() {
17+
if (activityExecutor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
18+
throw new IllegalStateException(
19+
"Activity stub belongs to a different workflow. Create a new stub for each workflow instance.");
20+
}
21+
}
22+
1623
static ActivityStub newInstance(
1724
ActivityOptions options,
1825
WorkflowOutboundCallsInterceptor activityExecutor,
@@ -34,6 +41,7 @@ static ActivityStub newInstance(
3441
@Override
3542
public <R> Promise<R> executeAsync(
3643
String activityName, Class<R> resultClass, Type resultType, Object... args) {
44+
assertSameWorkflow();
3745
this.assertReadOnly.apply();
3846
return activityExecutor
3947
.executeActivity(

temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ class ChildWorkflowStubImpl implements ChildWorkflowStub {
1818
private final CompletablePromise<WorkflowExecution> execution;
1919
private final Functions.Proc1<String> assertReadOnly;
2020

21+
private void assertSameWorkflow() {
22+
if (outboundCallsInterceptor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
23+
throw new IllegalStateException(
24+
"Child workflow stub belongs to a different workflow. Create a new stub for each workflow instance.");
25+
}
26+
}
27+
2128
ChildWorkflowStubImpl(
2229
String workflowType,
2330
ChildWorkflowOptions options,
@@ -60,6 +67,7 @@ public <R> R execute(Class<R> resultClass, Object... args) {
6067

6168
@Override
6269
public <R> R execute(Class<R> resultClass, Type resultType, Object... args) {
70+
assertSameWorkflow();
6371
assertReadOnly.apply("schedule child workflow");
6472
Promise<R> result = executeAsync(resultClass, resultType, args);
6573
if (AsyncInternal.isAsync()) {
@@ -83,6 +91,7 @@ public <R> Promise<R> executeAsync(Class<R> resultClass, Object... args) {
8391

8492
@Override
8593
public <R> Promise<R> executeAsync(Class<R> resultClass, Type resultType, Object... args) {
94+
assertSameWorkflow();
8695
assertReadOnly.apply("schedule child workflow");
8796
ChildWorkflowOutput<R> result =
8897
outboundCallsInterceptor.executeChildWorkflow(
@@ -100,6 +109,7 @@ public <R> Promise<R> executeAsync(Class<R> resultClass, Type resultType, Object
100109

101110
@Override
102111
public void signal(String signalName, Object... args) {
112+
assertSameWorkflow();
103113
assertReadOnly.apply("signal workflow");
104114
Promise<Void> signaled =
105115
outboundCallsInterceptor

temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ class ExternalWorkflowStubImpl implements ExternalWorkflowStub {
1313
private final WorkflowExecution execution;
1414
private Functions.Proc1<String> assertReadOnly;
1515

16+
private void assertSameWorkflow() {
17+
if (outboundCallsInterceptor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
18+
throw new IllegalStateException(
19+
"External workflow stub belongs to a different workflow. Create a new stub for each workflow instance.");
20+
}
21+
}
22+
1623
public ExternalWorkflowStubImpl(
1724
WorkflowExecution execution,
1825
WorkflowOutboundCallsInterceptor outboundCallsInterceptor,
@@ -29,6 +36,7 @@ public WorkflowExecution getExecution() {
2936

3037
@Override
3138
public void signal(String signalName, Object... args) {
39+
assertSameWorkflow();
3240
assertReadOnly.apply("signal external workflow");
3341
Promise<Void> signaled =
3442
outboundCallsInterceptor
@@ -52,6 +60,7 @@ public void signal(String signalName, Object... args) {
5260

5361
@Override
5462
public void cancel() {
63+
assertSameWorkflow();
5564
assertReadOnly.apply("cancel external workflow");
5665
Promise<Void> cancelRequested =
5766
outboundCallsInterceptor

temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityStubImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ class LocalActivityStubImpl extends ActivityStubBase {
1313
private final WorkflowOutboundCallsInterceptor activityExecutor;
1414
private final Functions.Proc assertReadOnly;
1515

16+
private void assertSameWorkflow() {
17+
if (activityExecutor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
18+
throw new IllegalStateException(
19+
"Local Activity stub belongs to a different workflow. Create a new stub for each workflow instance.");
20+
}
21+
}
22+
1623
static ActivityStub newInstance(
1724
LocalActivityOptions options,
1825
WorkflowOutboundCallsInterceptor activityExecutor,
@@ -34,6 +41,7 @@ private LocalActivityStubImpl(
3441
@Override
3542
public <R> Promise<R> executeAsync(
3643
String activityName, Class<R> resultClass, Type resultType, Object... args) {
44+
assertSameWorkflow();
3745
this.assertReadOnly.apply();
3846
return activityExecutor
3947
.executeLocalActivity(

temporal-sdk/src/main/java/io/temporal/internal/sync/NexusServiceStubImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ public class NexusServiceStubImpl implements NexusServiceStub {
1313
private final WorkflowOutboundCallsInterceptor outboundCallsInterceptor;
1414
private final Functions.Proc1<String> assertReadOnly;
1515

16+
private void assertSameWorkflow() {
17+
if (outboundCallsInterceptor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
18+
throw new IllegalStateException(
19+
"Nexus service stub belongs to a different workflow. Create a new stub for each workflow instance.");
20+
}
21+
}
22+
1623
public NexusServiceStubImpl(
1724
String name,
1825
NexusServiceOptions options,
@@ -31,6 +38,7 @@ public <R> R execute(String operationName, Class<R> resultClass, Object arg) {
3138

3239
@Override
3340
public <R> R execute(String operationName, Class<R> resultClass, Type resultType, Object arg) {
41+
assertSameWorkflow();
3442
assertReadOnly.apply("execute nexus operation");
3543
Promise<R> result = executeAsync(operationName, resultClass, resultType, arg);
3644
if (AsyncInternal.isAsync()) {
@@ -55,6 +63,7 @@ public <R> Promise<R> executeAsync(String operationName, Class<R> resultClass, O
5563
@Override
5664
public <R> Promise<R> executeAsync(
5765
String operationName, Class<R> resultClass, Type resultType, Object arg) {
66+
assertSameWorkflow();
5867
assertReadOnly.apply("execute nexus operation");
5968
NexusOperationOptions mergedOptions =
6069
NexusOperationOptions.newBuilder(options.getOperationOptions())
@@ -82,6 +91,7 @@ public <R> NexusOperationHandle<R> start(String operationName, Class<R> resultCl
8291
@Override
8392
public <R> NexusOperationHandle<R> start(
8493
String operationName, Class<R> resultClass, Type resultType, Object arg) {
94+
assertSameWorkflow();
8595
assertReadOnly.apply("schedule nexus operation");
8696
NexusOperationOptions mergedOptions =
8797
NexusOperationOptions.newBuilder(options.getOperationOptions())

0 commit comments

Comments
 (0)