Skip to content

Commit 3892394

Browse files
💥 Standalone Activities for Java (#2858)
* vc * Integration tests. * Options .toBuilder() should duplicate 'this'. * equals and hashCode for StartActivityOptions * Only throw ActivityAlreadyStartedException if error detail is present * Same for getActivityResult * equals/hashcode/tostring for ActivityExecution Remove isEmpty() check from isWorkflowActivity. * use-api-design * match fully * More tests * Rename ActivityExecution to ActivityExecutionMetadata. * Store ActivityExecutionInfo instead of DescribeActivityExecutionResponse. * Rename getRetryPolicy to getRetryOptions. * Missing methods in ActivityExecutionDescription. * Correct naming for interceptor interfaces. * Update ActivityInfo interface. * More tests. * Tests for standalone completion client methods. * Interceptor tests. * Nullable fields. * swap args * Rename IsInWorkflow to IsWorkflowActivity. * Test: contextPropagatorValueAppearsInStartActivityHeader. * Revert "Test: contextPropagatorValueAppearsInStartActivityHeader." This reverts commit 381e66c. * ActivityClientInterceptor and ActivityClientInterceptorBase. * Correct order for activity interceptors. * Rename to initializeClientInvoker. * Javadoc warnings, unused classes, a deprecated method. * Remove runtime rethrow. * Pass null instead of "" for summary/details. * Remove section dividers. * useless test comments * Enable SAA in test server. * Remove trivial tests. * wip * Rename to ActivityClientCallsInterceptorBaseTest. * Non-mocked completion client tests. * Tweak javadoc * Box for capturing * Box for capturing * MethodExtractor * All overloads of same * Refactor ME * .start() pulls headers from context propagators. * Distinguish standalone method names in completion client. * Use long poll token in describeActivity. * Expose lastFailure and raw proto on ActivityExecutionDescription. * Completion handled at gPRC layer. * Cache for ActivityHandleImpl.getResultAsync. * Rename ActivityHandleWrapper to ActivityHandleImpl. * Javadoc for ActivityClientCallsInterceptor * ActivityException base class * Remove throws clauses for unchecked exceptions * Don't expose ActivityClient.listExecutionsPaginated * Examples for all overloads in ActivityClient * Update temporal-sdk/src/main/java/io/temporal/client/ActivityClient.java Co-authored-by: Quinn Klassen <klassenq@gmail.com> * Finish removing implementation detail in comment. * Remove trivial tests * Update testFromUntypedWithExplicitTypePassesTypeToInterceptor to make sure type is forwarded * testGetActivityResultAsyncDelegatesToNext * Javadoc: missing import * Revert "Use long poll token in describeActivity." This reverts commit 749a8c7. * Address small review changes * Default implementations for some untyped execute/executeAsync * Remove ActivityCountOptions * Apply DataConverter to results of .getLastFailure * Polling can outlast server timeout * Remove ActivityListOptions and ActivityListPaginatedOptions; keep both listActivities and listActivitiesPaginated in the interceptor * getResult/Async continue polling across the server's long-poll boundary * getResult/Async continue polling across the server's long-poll boundary and the client's rpc boundary (this and previous) * Split slow tests for speed * Increase timeout tolerance for GetActivityResultOverLongPollWaitTest. * Remaining two default overloads of execute/executeAsync, and remove redundant Impl implementations. * Remove unused listActivitiesPaginated interceptor * Add overload to ActivityHandle.getResult with timeout. Implemented this by adding a Deadline to RootActivityClientInvoker.getActivityResult. Also: In tests, use .start(...).get() instead of .executeAsync, for clarity. Added stopwatch check to testGetActivityResultAsyncTimeoutAbortsPolling to avoid possibility of false negative. * TODO * getWorkflowNamespace returns null if no workflow execution * Update comment on ActivityInfo for getActivityNamespace() returning null for SAA. * Remove result caching, will revisit later. * Rename isWorkflowActivity to isInWorkflow * Remove default ActivityClientCallsInterceptor.getActivityResultAsync * Move Func7 to the correct spot * Remove internal ActivityExecutionDescription.info() * Tests that check that both only scheduleToCloseTimeout and only startToCloseTimeout are allowed. * ActivityClientCallsInterceptorChainTest: confirm ordering for 2 interceptors with real ActivityClient. * Added tests testGetHandleWithNullRunId and testVoidExecuteOverloadDiscardsNonVoidResult. Renames: ActivityInfoStandaloneTest -> ActivityInfoImplTest, testDescribeNoToken -> testDescribe in ActivityHandleImplTest. --------- Co-authored-by: Quinn Klassen <klassenq@gmail.com>
1 parent 4b2c294 commit 3892394

55 files changed

Lines changed: 7597 additions & 27 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

‎.github/workflows/ci.yml‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ jobs:
110110
--dynamic-config-value history.enableRequestIdRefLinks=true \
111111
--dynamic-config-value frontend.WorkerHeartbeatsEnabled=true \
112112
--dynamic-config-value frontend.ListWorkersEnabled=true \
113-
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' &
113+
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \
114+
--dynamic-config-value frontend.activityAPIsEnabled=true \
115+
--dynamic-config-value activity.enableStandalone=true \
116+
--dynamic-config-value history.enableChasm=true \
117+
--dynamic-config-value history.enableTransitionHistory=true &
114118
sleep 10s
115119
116120
# Can't actually run tests against Java 8 because Mockito 5 requires Java 11+.

‎contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public ActivityOutput execute(ActivityInput input) {
5151
tracer,
5252
activityInfo.getActivityType(),
5353
activityInfo.getWorkflowId(),
54-
activityInfo.getRunId(),
54+
activityInfo.getWorkflowRunId(),
5555
rootSpanContext)
5656
.start();
5757
try (Scope scope = tracer.scopeManager().activate(activityRunSpan)) {

‎temporal-sdk/src/main/java/io/temporal/activity/ActivityInfo.java‎

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.time.Duration;
88
import java.util.Optional;
99
import javax.annotation.Nonnull;
10+
import javax.annotation.Nullable;
1011

1112
/**
1213
* Information about the Activity Task that the current Activity Execution is handling. Use {@link
@@ -21,15 +22,35 @@ public interface ActivityInfo {
2122
byte[] getTaskToken();
2223

2324
/**
24-
* @return WorkflowId of the Workflow Execution that scheduled the Activity Execution.
25+
* @return WorkflowId of the Workflow Execution that scheduled the Activity Execution, or {@code
26+
* null} for standalone activities not scheduled by a workflow.
2527
*/
28+
@Nullable
2629
String getWorkflowId();
2730

2831
/**
29-
* @return RunId of the Workflow Execution that scheduled the Activity Execution.
32+
* @return RunId of the Workflow Execution that scheduled the Activity Execution, or {@code null}
33+
* for standalone activities not scheduled by a workflow.
34+
* @deprecated use {@link #getWorkflowRunId()}
3035
*/
36+
@Deprecated
37+
@Nullable
3138
String getRunId();
3239

40+
/**
41+
* @return RunId of the Workflow Execution that scheduled the Activity Execution, or {@code null}
42+
* for standalone activities not scheduled by a workflow.
43+
*/
44+
@Nullable
45+
String getWorkflowRunId();
46+
47+
/**
48+
* @return the run ID of this standalone Activity Execution, or {@code null} for activities
49+
* scheduled by a workflow.
50+
*/
51+
@Nullable
52+
String getActivityRunId();
53+
3354
/**
3455
* ID of the Activity Execution. This ID can be used to complete the Activity Execution
3556
* asynchronously through {@link io.temporal.client.ActivityCompletionClient#complete(String,
@@ -82,8 +103,10 @@ public interface ActivityInfo {
82103
Optional<Payloads> getHeartbeatDetails();
83104

84105
/**
85-
* @return the Workflow Type of the Workflow Execution that executed the Activity.
106+
* @return the Workflow Type of the Workflow Execution that executed the Activity, or {@code null}
107+
* for standalone activities not scheduled by a workflow.
86108
*/
109+
@Nullable
87110
String getWorkflowType();
88111

89112
/**
@@ -92,10 +115,12 @@ public interface ActivityInfo {
92115
* same namespace, hence no need for different {@code getWorkflowNamespace()} and {@link
93116
* #getActivityNamespace()} methods.
94117
*
95-
* @return the Namespace of Workflow Execution that scheduled the Activity.
118+
* @return the Namespace of Workflow Execution that scheduled the Activity, or null for standalone
119+
* activities.
96120
* @deprecated use {@link #getNamespace()}
97121
*/
98122
@Deprecated
123+
@Nullable
99124
String getWorkflowNamespace();
100125

101126
/**
@@ -110,8 +135,19 @@ public interface ActivityInfo {
110135
@Deprecated
111136
String getActivityNamespace();
112137

138+
/**
139+
* @return the Namespace of this Activity Execution.
140+
*/
113141
String getNamespace();
114142

143+
/**
144+
* @return {@code true} if this activity was scheduled by a workflow execution; {@code false} for
145+
* standalone activities.
146+
*/
147+
default boolean isInWorkflow() {
148+
return getWorkflowId() != null;
149+
}
150+
115151
String getActivityTaskQueue();
116152

117153
/**
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.temporal.client;
2+
3+
import io.temporal.common.Experimental;
4+
import javax.annotation.Nullable;
5+
6+
/**
7+
* Thrown by {@link ActivityClient#start} when the server returns an ALREADY_EXISTS error because an
8+
* activity with the same ID is already running (or has a completed run that conflicts with the
9+
* requested {@link StartActivityOptions#getIdReusePolicy()} / {@link
10+
* StartActivityOptions#getIdConflictPolicy()}).
11+
*/
12+
@Experimental
13+
public final class ActivityAlreadyStartedException extends ActivityException {
14+
15+
private final String activityType;
16+
17+
public ActivityAlreadyStartedException(
18+
String activityId, String activityType, @Nullable String runId, Throwable cause) {
19+
super(
20+
"Activity already started: activityId='"
21+
+ activityId
22+
+ "', activityType='"
23+
+ activityType
24+
+ (runId != null ? "', runId='" + runId + "'" : "'"),
25+
activityId,
26+
runId,
27+
cause);
28+
this.activityType = activityType;
29+
}
30+
31+
/** The activity type that was requested. */
32+
public String getActivityType() {
33+
return activityType;
34+
}
35+
}

0 commit comments

Comments
 (0)