Skip to content

Commit bc467e8

Browse files
committed
Cache for ActivityHandleImpl.getResultAsync.
1 parent 076073f commit bc467e8

2 files changed

Lines changed: 91 additions & 5 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/client/ActivityHandleImpl.java

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.lang.reflect.Type;
88
import java.util.concurrent.CompletableFuture;
99
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.atomic.AtomicReference;
1011
import javax.annotation.Nullable;
1112

1213
/**
@@ -18,6 +19,7 @@ public final class ActivityHandleImpl implements UntypedActivityHandle {
1819
private final String activityId;
1920
private final @Nullable String activityRunId;
2021
private final ActivityClientCallsInterceptor clientCallsInterceptor;
22+
private final AtomicReference<CompletableFuture<?>> noTimeoutResult = new AtomicReference<>();
2123

2224
public ActivityHandleImpl(
2325
String activityId,
@@ -72,11 +74,43 @@ public <R> CompletableFuture<R> getResultAsync(
7274
@Override
7375
public <R> CompletableFuture<R> getResultAsync(
7476
long timeout, TimeUnit unit, Class<R> resultClass, @Nullable Type resultType) {
75-
return clientCallsInterceptor
76-
.getActivityResultAsync(
77-
new ActivityClientCallsInterceptor.GetActivityResultInput<>(
78-
activityId, activityRunId, resultClass, resultType, timeout, unit))
79-
.thenApply(ActivityClientCallsInterceptor.GetActivityResultOutput::getResult);
77+
boolean noTimeout = timeout == Long.MAX_VALUE && unit == TimeUnit.MILLISECONDS;
78+
79+
CompletableFuture<?> cached = noTimeoutResult.get();
80+
if (cached != null && (noTimeout || cached.isDone())) {
81+
@SuppressWarnings("unchecked")
82+
CompletableFuture<R> typed = (CompletableFuture<R>) cached;
83+
return typed;
84+
}
85+
86+
CompletableFuture<R> newFuture =
87+
clientCallsInterceptor
88+
.getActivityResultAsync(
89+
new ActivityClientCallsInterceptor.GetActivityResultInput<>(
90+
activityId, activityRunId, resultClass, resultType, timeout, unit))
91+
.thenApply(ActivityClientCallsInterceptor.GetActivityResultOutput::getResult);
92+
93+
// When a timed call succeeds the activity is done; populate cache so future calls skip polling.
94+
newFuture.whenComplete(
95+
(r, ex) -> {
96+
if (ex == null) {
97+
noTimeoutResult.compareAndSet(null, newFuture);
98+
}
99+
});
100+
101+
if (noTimeout && noTimeoutResult.compareAndSet(null, newFuture)) {
102+
return newFuture;
103+
}
104+
105+
// Another thread raced us on the first no-timeout call; return the winner
106+
cached = noTimeoutResult.get();
107+
if (cached != null && (noTimeout || cached.isDone())) {
108+
@SuppressWarnings("unchecked")
109+
CompletableFuture<R> typed = (CompletableFuture<R>) cached;
110+
return typed;
111+
}
112+
113+
return newFuture;
80114
}
81115

82116
@Override

temporal-sdk/src/test/java/io/temporal/internal/client/ActivityHandleImplTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.temporal.common.interceptors.ActivityClientCallsInterceptor;
1111
import io.temporal.common.interceptors.ActivityClientCallsInterceptor.*;
1212
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.TimeUnit;
1314
import org.junit.Before;
1415
import org.junit.Test;
1516

@@ -151,6 +152,57 @@ public void testFromUntypedGetResultAsyncNoArg() throws Exception {
151152
assertEquals("async-typed", future.get());
152153
}
153154

155+
@Test
156+
@SuppressWarnings("unchecked")
157+
public void testGetResultAsyncCachesNoTimeoutFuture() throws Exception {
158+
GetActivityResultOutput<String> output = mock(GetActivityResultOutput.class);
159+
when(output.getResult()).thenReturn("cached-result");
160+
when(interceptor.getActivityResultAsync(any(GetActivityResultInput.class)))
161+
.thenReturn(CompletableFuture.completedFuture(output));
162+
163+
UntypedActivityHandle handle = new ActivityHandleImpl("id", "run", interceptor);
164+
CompletableFuture<String> first = handle.getResultAsync(String.class);
165+
CompletableFuture<String> second = handle.getResultAsync(String.class);
166+
167+
assertSame(first, second);
168+
verify(interceptor, times(1)).getActivityResultAsync(any(GetActivityResultInput.class));
169+
}
170+
171+
@Test
172+
@SuppressWarnings("unchecked")
173+
public void testGetResultAsyncTimeoutReusesCompletedCache() throws Exception {
174+
GetActivityResultOutput<String> output = mock(GetActivityResultOutput.class);
175+
when(output.getResult()).thenReturn("done");
176+
when(interceptor.getActivityResultAsync(any(GetActivityResultInput.class)))
177+
.thenReturn(CompletableFuture.completedFuture(output));
178+
179+
UntypedActivityHandle handle = new ActivityHandleImpl("id", "run", interceptor);
180+
// Warm the cache with a no-timeout call
181+
handle.getResultAsync(String.class).get();
182+
// Timeout call should reuse the completed cache, not start a new poll
183+
CompletableFuture<String> timed = handle.getResultAsync(5, TimeUnit.SECONDS, String.class);
184+
assertEquals("done", timed.get());
185+
186+
verify(interceptor, times(1)).getActivityResultAsync(any(GetActivityResultInput.class));
187+
}
188+
189+
@Test
190+
@SuppressWarnings("unchecked")
191+
public void testGetResultAsyncTimeoutSuccessPopulatesCache() throws Exception {
192+
GetActivityResultOutput<String> output = mock(GetActivityResultOutput.class);
193+
when(output.getResult()).thenReturn("timed-result");
194+
when(interceptor.getActivityResultAsync(any(GetActivityResultInput.class)))
195+
.thenReturn(CompletableFuture.completedFuture(output));
196+
197+
UntypedActivityHandle handle = new ActivityHandleImpl("id", "run", interceptor);
198+
// First call: timed — succeeds, should populate cache
199+
handle.getResultAsync(5, TimeUnit.SECONDS, String.class).get();
200+
// Second call: no-timeout — should reuse the cache, not issue a second poll
201+
handle.getResultAsync(String.class).get();
202+
203+
verify(interceptor, times(1)).getActivityResultAsync(any(GetActivityResultInput.class));
204+
}
205+
154206
@Test
155207
@SuppressWarnings("unchecked")
156208
public void testGetResultAsyncWrapsActivityFailedExceptionInRuntimeException() throws Exception {

0 commit comments

Comments
 (0)