diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9dbb8d7928..54f03c912a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -102,6 +102,7 @@ jobs: --dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \ --dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \ --dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \ + --dynamic-config-value frontend.activityAPIsEnabled=true \ --dynamic-config-value system.enableDeploymentVersions=true & sleep 10s diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityPausedException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityPausedException.java new file mode 100644 index 0000000000..7318f9defe --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityPausedException.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.client; + +import io.temporal.activity.ActivityInfo; + +/*** + * Indicates that the activity was paused by the user. + * + *
Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.
+ */
+public final class ActivityPausedException extends ActivityCompletionException {
+ public ActivityPausedException(ActivityInfo info) {
+ super(info);
+ }
+
+ public ActivityPausedException() {
+ super();
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java
index 07150f4298..846a4f60e6 100644
--- a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java
+++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java
@@ -26,7 +26,7 @@
/**
* Indicates that {@link WorkerFactory#shutdown()} or {@link WorkerFactory#shutdownNow()} was
- * called. It is OK to ignore the exception to let activity to complete. It assumes that {@link
+ * called. It is OK to ignore the exception to let the activity complete. It assumes that {@link
* WorkerFactory#awaitTermination(long, TimeUnit)} is called with a timeout larger than the activity
* execution time.
*/
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java
index 8259770fec..db230591c1 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java
@@ -216,6 +216,8 @@ private void sendHeartbeatRequest(Object details) {
metricsScope);
if (status.getCancelRequested()) {
lastException = new ActivityCanceledException(info);
+ } else if (status.getActivityPaused()) {
+ lastException = new ActivityPausedException(info);
} else {
lastException = null;
}
diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityPauseTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityPauseTest.java
new file mode 100644
index 0000000000..aeccd03761
--- /dev/null
+++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityPauseTest.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
+ *
+ * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Modifications copyright (C) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this material except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.temporal.activity;
+
+import static org.junit.Assume.assumeTrue;
+
+import io.temporal.api.common.v1.WorkflowExecution;
+import io.temporal.api.workflow.v1.PendingActivityInfo;
+import io.temporal.api.workflowservice.v1.PauseActivityRequest;
+import io.temporal.client.ActivityPausedException;
+import io.temporal.client.WorkflowStub;
+import io.temporal.testing.internal.SDKTestOptions;
+import io.temporal.testing.internal.SDKTestWorkflowRule;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.Workflow;
+import io.temporal.workflow.shared.TestActivities;
+import io.temporal.workflow.shared.TestWorkflows;
+import java.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ActivityPauseTest {
+
+ @Rule
+ public SDKTestWorkflowRule testWorkflowRule =
+ SDKTestWorkflowRule.newBuilder()
+ .setWorkflowTypes(TestWorkflowImpl.class)
+ .setActivityImplementations(new HeartBeatingActivityImpl())
+ .build();
+
+ @Test
+ public void activityPause() {
+ assumeTrue(
+ "Test Server doesn't support activity pause", SDKTestWorkflowRule.useExternalService);
+
+ TestWorkflows.TestWorkflowReturnString workflow =
+ testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class);
+ Assert.assertEquals("I am stopped by Pause", workflow.execute());
+ Assert.assertEquals(
+ 1,
+ WorkflowStub.fromTyped(workflow)
+ .describe()
+ .getRawDescription()
+ .getPendingActivitiesCount());
+ PendingActivityInfo activityInfo =
+ WorkflowStub.fromTyped(workflow).describe().getRawDescription().getPendingActivities(0);
+ Assert.assertTrue(activityInfo.getPaused());
+ }
+
+ public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {
+
+ private final TestActivities.TestActivity1 activities =
+ Workflow.newActivityStub(
+ TestActivities.TestActivity1.class,
+ SDKTestOptions.newActivityOptions20sScheduleToClose());
+
+ @Override
+ public String execute() {
+ Async.function(activities::execute, "");
+ Workflow.sleep(Duration.ofSeconds(1));
+ return activities.execute("CompleteOnPause");
+ }
+ }
+
+ public static class HeartBeatingActivityImpl implements TestActivities.TestActivity1 {
+ @Override
+ public String execute(String arg) {
+ ActivityInfo info = Activity.getExecutionContext().getInfo();
+ // Have the activity pause itself
+ Activity.getExecutionContext()
+ .getWorkflowClient()
+ .getWorkflowServiceStubs()
+ .blockingStub()
+ .pauseActivity(
+ PauseActivityRequest.newBuilder()
+ .setNamespace(info.getNamespace())
+ .setExecution(
+ WorkflowExecution.newBuilder().setWorkflowId(info.getWorkflowId()).build())
+ .setId(info.getActivityId())
+ .build());
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ // Heartbeat and verify that the correct exception is thrown
+ Activity.getExecutionContext().heartbeat("1");
+ } catch (ActivityPausedException pe) {
+ if (arg.equals("CompleteOnPause")) {
+ // An activity should be able to succeed if paused
+ return "I am stopped by Pause";
+ }
+ // This will fail the attempt, and the activity will not be retried if not unpaused
+ throw pe;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}