Skip to content

Commit 749a8c7

Browse files
committed
Use long poll token in describeActivity.
1 parent 3e99be2 commit 749a8c7

8 files changed

Lines changed: 105 additions & 11 deletions

File tree

temporal-sdk/src/main/java/io/temporal/client/ActivityExecutionDescription.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,13 @@ public final class ActivityExecutionDescription extends ActivityExecutionMetadat
3030
private final ActivityExecutionInfo info;
3131
private final DataConverter dataConverter;
3232
private final String namespace;
33+
private final @Nullable byte[] longPollToken;
3334

3435
public ActivityExecutionDescription(
35-
ActivityExecutionInfo info, DataConverter dataConverter, String namespace) {
36+
ActivityExecutionInfo info,
37+
DataConverter dataConverter,
38+
String namespace,
39+
@Nullable byte[] longPollToken) {
3640
super(
3741
null,
3842
info.getActivityId(),
@@ -51,6 +55,7 @@ public ActivityExecutionDescription(
5155
this.info = info;
5256
this.dataConverter = dataConverter;
5357
this.namespace = namespace;
58+
this.longPollToken = longPollToken;
5459
}
5560

5661
private static @Nullable String nullIfEmpty(String s) {
@@ -131,6 +136,15 @@ public String getLastWorkerIdentity() {
131136
return w.isEmpty() ? null : w;
132137
}
133138

139+
/**
140+
* Token for a follow-on {@link UntypedActivityHandle#describe(byte[])} call. Pass this token to
141+
* long-poll until the activity state changes. {@code null} when the activity is complete.
142+
*/
143+
@Nullable
144+
public byte[] getLongPollToken() {
145+
return longPollToken;
146+
}
147+
134148
/** Time when the next retry attempt will be scheduled. */
135149
@Nullable
136150
public Instant getNextAttemptScheduleTime() {

temporal-sdk/src/main/java/io/temporal/client/ActivityHandleWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ public ActivityExecutionDescription describe() {
6868
return delegate.describe();
6969
}
7070

71+
@Override
72+
public ActivityExecutionDescription describe(@Nullable byte[] longPollToken) {
73+
return delegate.describe(longPollToken);
74+
}
75+
7176
@Override
7277
public void cancel() {
7378
delegate.cancel();

temporal-sdk/src/main/java/io/temporal/client/UntypedActivityHandle.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,18 @@ public interface UntypedActivityHandle {
7171
*/
7272
ActivityExecutionDescription describe();
7373

74+
/**
75+
* Long-polls until the activity state changes from the state encoded in {@code longPollToken},
76+
* then returns the updated description. Pass the token from a previous {@link #describe()} call
77+
* via {@link ActivityExecutionDescription#getLongPollToken()}. If {@code longPollToken} is {@code
78+
* null}, returns the current state immediately (equivalent to {@link #describe()}).
79+
*
80+
* @param longPollToken token from a previous describe response, or {@code null} for an immediate
81+
* snapshot
82+
* @return updated description of the activity
83+
*/
84+
ActivityExecutionDescription describe(@Nullable byte[] longPollToken);
85+
7486
/**
7587
* Requests cancellation of the activity. The activity will receive a cancellation via {@link
7688
* io.temporal.activity.ActivityExecutionContext#heartbeat(Object)}.

temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityClientCallsInterceptor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,17 @@ public R getResult() {
157157
final class DescribeActivityInput {
158158
private final String id;
159159
private final @Nullable String runId;
160+
private final @Nullable byte[] longPollToken;
160161

161162
public DescribeActivityInput(String id, @Nullable String runId) {
163+
this(id, runId, null);
164+
}
165+
166+
public DescribeActivityInput(
167+
String id, @Nullable String runId, @Nullable byte[] longPollToken) {
162168
this.id = id;
163169
this.runId = runId;
170+
this.longPollToken = longPollToken;
164171
}
165172

166173
public String getId() {
@@ -171,6 +178,11 @@ public String getId() {
171178
public String getRunId() {
172179
return runId;
173180
}
181+
182+
@Nullable
183+
public byte[] getLongPollToken() {
184+
return longPollToken;
185+
}
174186
}
175187

176188
@Experimental

temporal-sdk/src/main/java/io/temporal/internal/client/ActivityHandleImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,15 @@ public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, @Nullable T
6464

6565
@Override
6666
public ActivityExecutionDescription describe() {
67+
return describe(null);
68+
}
69+
70+
@Override
71+
public ActivityExecutionDescription describe(@Nullable byte[] longPollToken) {
6772
return clientCallsInterceptor
6873
.describeActivity(
69-
new ActivityClientCallsInterceptor.DescribeActivityInput(activityId, activityRunId))
74+
new ActivityClientCallsInterceptor.DescribeActivityInput(
75+
activityId, activityRunId, longPollToken))
7076
.getDescription();
7177
}
7278

temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,18 @@ public DescribeActivityOutput describeActivity(DescribeActivityInput input) {
186186
if (input.getRunId() != null) {
187187
req.setRunId(input.getRunId());
188188
}
189+
if (input.getLongPollToken() != null) {
190+
req.setLongPollToken(ByteString.copyFrom(input.getLongPollToken()));
191+
}
189192
DescribeActivityExecutionResponse response = genericClient.describeActivity(req.build());
193+
byte[] token =
194+
response.getLongPollToken().isEmpty() ? null : response.getLongPollToken().toByteArray();
190195
return new DescribeActivityOutput(
191196
new ActivityExecutionDescription(
192-
response.getInfo(), clientOptions.getDataConverter(), clientOptions.getNamespace()));
197+
response.getInfo(),
198+
clientOptions.getDataConverter(),
199+
clientOptions.getNamespace(),
200+
token));
193201
}
194202

195203
@Override

temporal-sdk/src/test/java/io/temporal/client/ActivityExecutionDescriptionTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ private ActivityExecutionInfo buildInfo(String activityId, String runId) {
3838
@Test
3939
public void testNullRunIdWhenEmpty() {
4040
ActivityExecutionDescription desc =
41-
new ActivityExecutionDescription(buildInfo("act-id", ""), CONVERTER, "test-ns");
41+
new ActivityExecutionDescription(buildInfo("act-id", ""), CONVERTER, "test-ns", null);
4242
assertNull(desc.getActivityRunId());
4343
}
4444

4545
@Test
4646
public void testNullableFieldsAbsentByDefault() {
4747
ActivityExecutionDescription desc =
48-
new ActivityExecutionDescription(buildInfo("act-id", ""), CONVERTER, "test-ns");
48+
new ActivityExecutionDescription(buildInfo("act-id", ""), CONVERTER, "test-ns", null);
4949

5050
assertNull(desc.getCloseTime());
5151
assertNull(desc.getExecutionDuration());
@@ -69,14 +69,14 @@ public void testNullableFieldsAbsentByDefault() {
6969
@Test
7070
public void testScheduledTime() {
7171
ActivityExecutionDescription desc =
72-
new ActivityExecutionDescription(buildInfo("act-id", ""), CONVERTER, "test-ns");
72+
new ActivityExecutionDescription(buildInfo("act-id", ""), CONVERTER, "test-ns", null);
7373
assertEquals(Instant.ofEpochMilli(1000), desc.getScheduledTime());
7474
}
7575

7676
@Test
7777
public void testHasHeartbeatDetailsAbsent() {
7878
ActivityExecutionDescription desc =
79-
new ActivityExecutionDescription(buildInfo("id", "run"), CONVERTER, "test-ns");
79+
new ActivityExecutionDescription(buildInfo("id", "run"), CONVERTER, "test-ns", null);
8080
assertFalse(desc.hasHeartbeatDetails());
8181
assertFalse(desc.getHeartbeatDetails(String.class).isPresent());
8282
}
@@ -87,7 +87,7 @@ public void testGetHeartbeatDetailsPresent() {
8787
ActivityExecutionInfo info =
8888
buildInfo("id", "run").toBuilder().setHeartbeatDetails(encoded).build();
8989
ActivityExecutionDescription desc =
90-
new ActivityExecutionDescription(info, CONVERTER, "test-ns");
90+
new ActivityExecutionDescription(info, CONVERTER, "test-ns", null);
9191

9292
assertTrue(desc.hasHeartbeatDetails());
9393
Optional<String> result = desc.getHeartbeatDetails(String.class);
@@ -103,7 +103,7 @@ public void testGetHeartbeatDetailsWithExplicitGenericType() {
103103
ActivityExecutionInfo info =
104104
buildInfo("id", "run").toBuilder().setHeartbeatDetails(encoded).build();
105105
ActivityExecutionDescription desc =
106-
new ActivityExecutionDescription(info, CONVERTER, "test-ns");
106+
new ActivityExecutionDescription(info, CONVERTER, "test-ns", null);
107107

108108
Type genericType = new TypeToken<List<String>>() {}.getType();
109109
Class<List<String>> listClass = (Class<List<String>>) (Class<?>) List.class;
@@ -122,7 +122,7 @@ public void testGetWorkerDeploymentVersionPresent() {
122122
ActivityExecutionInfo info =
123123
buildInfo("id", "run").toBuilder().setLastDeploymentVersion(protoVersion).build();
124124
ActivityExecutionDescription desc =
125-
new ActivityExecutionDescription(info, CONVERTER, "test-ns");
125+
new ActivityExecutionDescription(info, CONVERTER, "test-ns", null);
126126

127127
WorkerDeploymentVersion version = desc.getWorkerDeploymentVersion();
128128
assertNotNull(version);
@@ -137,7 +137,7 @@ public void testGetPriorityPresent() {
137137
ActivityExecutionInfo info =
138138
buildInfo("id", "run").toBuilder().setPriority(protoPriority).build();
139139
ActivityExecutionDescription desc =
140-
new ActivityExecutionDescription(info, CONVERTER, "test-ns");
140+
new ActivityExecutionDescription(info, CONVERTER, "test-ns", null);
141141

142142
Priority priority = desc.getPriority();
143143
assertNotNull(priority);

temporal-sdk/src/test/java/io/temporal/client/functional/StandaloneActivityTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,43 @@ public void testDescribeUserMetadataIsAccurate() {
392392
assertEquals("Test details\nLine 2", desc.getStaticDetails());
393393
}
394394

395+
@Test
396+
public void testDescribeLongPollObservesCompletion() throws Exception {
397+
assumeTrue(SDKTestWorkflowRule.useExternalService);
398+
asyncStartLatch = new CountDownLatch(1);
399+
try {
400+
ActivityClient client = newActivityClient();
401+
ActivityHandle<String> handle =
402+
client.start(
403+
AsyncCompletionActivity.class,
404+
AsyncCompletionActivity::complete,
405+
simpleOpts(uniqueId()));
406+
407+
assertTrue("Activity did not start within 30s", asyncStartLatch.await(30, TimeUnit.SECONDS));
408+
409+
ActivityExecutionDescription desc = handle.describe();
410+
assertEquals(ActivityExecutionStatus.ACTIVITY_EXECUTION_STATUS_RUNNING, desc.getStatus());
411+
assertNotNull("Running activity must have a long-poll token", desc.getLongPollToken());
412+
413+
// Long-poll in background — blocks until server detects a state change
414+
CompletableFuture<ActivityExecutionDescription> longPollFuture =
415+
CompletableFuture.supplyAsync(() -> handle.describe(desc.getLongPollToken()));
416+
417+
// Complete the activity externally so the server unblocks the poll
418+
client
419+
.newActivityCompletionClient()
420+
.completeStandalone(asyncActivityId, Optional.empty(), "long-poll-result");
421+
422+
ActivityExecutionDescription updated = longPollFuture.get(30, TimeUnit.SECONDS);
423+
assertEquals(
424+
ActivityExecutionStatus.ACTIVITY_EXECUTION_STATUS_COMPLETED, updated.getStatus());
425+
} finally {
426+
asyncStartLatch = null;
427+
asyncActivityId = null;
428+
asyncActivityRunId = null;
429+
}
430+
}
431+
395432
@Test
396433
public void testCancelRunningActivitySucceeds() throws InterruptedException {
397434
assumeTrue(SDKTestWorkflowRule.useExternalService);

0 commit comments

Comments
 (0)