Skip to content

Commit 50c887a

Browse files
authored
Activity heartbeat throttling (#173)
* implement activity heartbeat throttling * add more unit tests for activity heartbeat * code format * review feedback * limit max heartbeat interval
1 parent 8f58742 commit 50c887a

File tree

7 files changed

+279
-44
lines changed

7 files changed

+279
-44
lines changed

src/main/java/com/uber/cadence/internal/sync/ActivityExecutionContextImpl.java

+85-22
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@
2929
import com.uber.cadence.client.ActivityNotExistsException;
3030
import com.uber.cadence.converter.DataConverter;
3131
import com.uber.cadence.serviceclient.IWorkflowService;
32-
import java.util.concurrent.CancellationException;
32+
import java.util.Optional;
33+
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.ScheduledFuture;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.locks.Lock;
37+
import java.util.concurrent.locks.ReentrantLock;
3338
import org.apache.thrift.TException;
3439
import org.slf4j.Logger;
3540
import org.slf4j.LoggerFactory;
@@ -43,32 +48,97 @@
4348
class ActivityExecutionContextImpl implements ActivityExecutionContext {
4449

4550
private static final Logger log = LoggerFactory.getLogger(ActivityExecutionContextImpl.class);
51+
private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000;
52+
private static final long MAX_HEARTBEAT_INTERVAL_MILLIS = 30000;
4653

4754
private final IWorkflowService service;
48-
4955
private final String domain;
50-
5156
private final ActivityTask task;
5257
private final DataConverter dataConverter;
5358
private boolean doNotCompleteOnReturn;
59+
private final long heartbeatIntervalMillis;
60+
private Optional<Object> lastDetails;
61+
private final ScheduledExecutorService heartbeatExecutor;
62+
private Lock lock = new ReentrantLock();
63+
private ScheduledFuture future;
64+
private ActivityCompletionException lastException;
5465

5566
/** Create an ActivityExecutionContextImpl with the given attributes. */
5667
ActivityExecutionContextImpl(
57-
IWorkflowService service, String domain, ActivityTask task, DataConverter dataConverter) {
68+
IWorkflowService service,
69+
String domain,
70+
ActivityTask task,
71+
DataConverter dataConverter,
72+
ScheduledExecutorService heartbeatExecutor) {
5873
this.domain = domain;
5974
this.service = service;
6075
this.task = task;
6176
this.dataConverter = dataConverter;
77+
this.heartbeatIntervalMillis =
78+
Math.min(
79+
(long) (0.8 * task.getHeartbeatTimeout().toMillis()), MAX_HEARTBEAT_INTERVAL_MILLIS);
80+
this.heartbeatExecutor = heartbeatExecutor;
6281
}
6382

64-
/**
65-
* @throws CancellationException
66-
* @see ActivityExecutionContext#recordActivityHeartbeat(Object)
67-
*/
83+
/** @see ActivityExecutionContext#recordActivityHeartbeat(Object) */
6884
@Override
6985
public void recordActivityHeartbeat(Object details) throws ActivityCompletionException {
70-
// TODO: call service with the specified minimal interval (through @ActivityExecutionOptions)
71-
// allowing more frequent calls of this method.
86+
lock.lock();
87+
try {
88+
// always set lastDetail. Successful heartbeat will clear it.
89+
lastDetails = details == null ? Optional.empty() : Optional.of(details);
90+
91+
// Only do sync heartbeat if there is no such call scheduled.
92+
if (future == null) {
93+
doHeartBeat(details);
94+
}
95+
96+
if (lastException != null) {
97+
throw lastException;
98+
}
99+
} finally {
100+
lock.unlock();
101+
}
102+
}
103+
104+
private void doHeartBeat(Object details) {
105+
long nextHeartbeatDelay;
106+
try {
107+
sendHeartbeatRequest(details);
108+
// Clear lastDetails only if heartbeat succeeds.
109+
lastDetails = null;
110+
nextHeartbeatDelay = heartbeatIntervalMillis;
111+
} catch (TException e) {
112+
// Not rethrowing to not fail activity implementation on intermittent connection or Cadence
113+
// errors.
114+
log.warn("Heartbeat failed.", e);
115+
nextHeartbeatDelay = HEARTBEAT_RETRY_WAIT_MILLIS;
116+
}
117+
118+
scheduleNextHeartbeat(nextHeartbeatDelay);
119+
}
120+
121+
private void scheduleNextHeartbeat(long delay) {
122+
future =
123+
heartbeatExecutor.schedule(
124+
() -> {
125+
lock.lock();
126+
try {
127+
if (lastDetails != null) {
128+
Object details = lastDetails.orElse(null);
129+
doHeartBeat(details);
130+
} else {
131+
future = null;
132+
}
133+
} finally {
134+
lock.unlock();
135+
}
136+
},
137+
delay,
138+
TimeUnit.MILLISECONDS);
139+
}
140+
141+
private void sendHeartbeatRequest(Object details) throws TException {
72142
RecordActivityTaskHeartbeatRequest r = new RecordActivityTaskHeartbeatRequest();
73143
r.setTaskToken(task.getTaskToken());
74144
byte[] serialized = dataConverter.toData(details);
@@ -77,21 +147,14 @@ public void recordActivityHeartbeat(Object details) throws ActivityCompletionExc
77147
try {
78148
status = service.RecordActivityTaskHeartbeat(r);
79149
if (status.isCancelRequested()) {
80-
throw new ActivityCancelledException(task);
150+
lastException = new ActivityCancelledException(task);
151+
} else {
152+
lastException = null;
81153
}
82154
} catch (EntityNotExistsError e) {
83-
throw new ActivityNotExistsException(task, e);
155+
lastException = new ActivityNotExistsException(task, e);
84156
} catch (BadRequestError e) {
85-
throw new ActivityCompletionFailureException(task, e);
86-
} catch (TException e) {
87-
log.warn(
88-
"Failure heartbeating on activityID="
89-
+ task.getActivityId()
90-
+ " of Workflow="
91-
+ task.getWorkflowExecution(),
92-
e);
93-
// Not rethrowing to not fail activity implementation on intermittent connection or Cadence
94-
// errors.
157+
lastException = new ActivityCompletionFailureException(task, e);
95158
}
96159
}
97160

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,24 @@
4141
import java.util.HashMap;
4242
import java.util.Map;
4343
import java.util.concurrent.CancellationException;
44+
import java.util.concurrent.ScheduledExecutorService;
4445

4546
class POJOActivityTaskHandler implements ActivityTaskHandler {
4647

47-
private DataConverter dataConverter;
48+
private final DataConverter dataConverter;
49+
private final ScheduledExecutorService heartbeatExecutor;
4850
private final Map<String, POJOActivityImplementation> activities =
4951
Collections.synchronizedMap(new HashMap<>());
5052

51-
POJOActivityTaskHandler(DataConverter dataConverter) {
53+
POJOActivityTaskHandler(DataConverter dataConverter, ScheduledExecutorService heartbeatExecutor) {
5254
this.dataConverter = dataConverter;
55+
this.heartbeatExecutor = heartbeatExecutor;
5356
}
5457

5558
public DataConverter getDataConverter() {
5659
return dataConverter;
5760
}
5861

59-
public void setDataConverter(DataConverter dataConverter) {
60-
this.dataConverter = dataConverter;
61-
}
62-
6362
public void addActivityImplementation(Object activity) {
6463
Class<?> cls = activity.getClass();
6564
for (Method method : cls.getMethods()) {
@@ -183,7 +182,7 @@ private class POJOActivityImplementation {
183182
public ActivityTaskHandler.Result execute(
184183
IWorkflowService service, String domain, ActivityTaskImpl task, Scope metricsScope) {
185184
ActivityExecutionContext context =
186-
new ActivityExecutionContextImpl(service, domain, task, dataConverter);
185+
new ActivityExecutionContextImpl(service, domain, task, dataConverter, heartbeatExecutor);
187186
byte[] input = task.getInput();
188187
Object[] args = dataConverter.fromDataArray(input, method.getParameterTypes());
189188
CurrentActivityExecutionContext.set(context);

src/main/java/com/uber/cadence/internal/sync/SyncActivityWorker.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@
2020
import com.uber.cadence.internal.worker.ActivityWorker;
2121
import com.uber.cadence.internal.worker.SingleWorkerOptions;
2222
import com.uber.cadence.serviceclient.IWorkflowService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.ScheduledExecutorService;
2325
import java.util.concurrent.TimeUnit;
2426

2527
/** Activity worker that supports POJO activity implementations. */
2628
public class SyncActivityWorker {
2729

2830
private final ActivityWorker worker;
2931
private final POJOActivityTaskHandler taskHandler;
32+
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
3033

3134
public SyncActivityWorker(
3235
IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) {
33-
taskHandler = new POJOActivityTaskHandler(options.getDataConverter());
36+
taskHandler = new POJOActivityTaskHandler(options.getDataConverter(), heartbeatExecutor);
3437
worker = new ActivityWorker(service, domain, taskList, options, taskHandler);
3538
}
3639

@@ -44,19 +47,24 @@ public void start() {
4447

4548
public void shutdown() {
4649
worker.shutdown();
50+
heartbeatExecutor.shutdown();
4751
}
4852

4953
public void shutdownNow() {
5054
worker.shutdownNow();
55+
heartbeatExecutor.shutdownNow();
5156
}
5257

5358
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
54-
return worker.awaitTermination(timeout, unit);
59+
return worker.awaitTermination(timeout, unit)
60+
&& heartbeatExecutor.awaitTermination(timeout, unit);
5561
}
5662

5763
public boolean shutdownAndAwaitTermination(long timeout, TimeUnit unit)
5864
throws InterruptedException {
59-
return worker.shutdownAndAwaitTermination(timeout, unit);
65+
heartbeatExecutor.shutdownNow();
66+
return worker.shutdownAndAwaitTermination(timeout, unit)
67+
&& heartbeatExecutor.awaitTermination(timeout, unit);
6068
}
6169

6270
public boolean isRunning() {

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@
9090
import java.util.Random;
9191
import java.util.UUID;
9292
import java.util.concurrent.CancellationException;
93+
import java.util.concurrent.Executors;
94+
import java.util.concurrent.ScheduledExecutorService;
9395
import java.util.concurrent.atomic.AtomicInteger;
9496
import java.util.function.BiPredicate;
9597
import java.util.function.Consumer;
@@ -103,14 +105,18 @@ public final class TestActivityEnvironmentInternal implements TestActivityEnviro
103105
private final TestEnvironmentOptions testEnvironmentOptions;
104106
private final AtomicInteger idSequencer = new AtomicInteger();
105107
private ClassConsumerPair<Object> activityHeartbetListener;
108+
private static final ScheduledExecutorService heartbeatExecutor =
109+
Executors.newScheduledThreadPool(20);
110+
private IWorkflowService workflowService;
106111

107112
public TestActivityEnvironmentInternal(TestEnvironmentOptions options) {
108113
if (options == null) {
109114
this.testEnvironmentOptions = new TestEnvironmentOptions.Builder().build();
110115
} else {
111116
this.testEnvironmentOptions = options;
112117
}
113-
activityTaskHandler = new POJOActivityTaskHandler(testEnvironmentOptions.getDataConverter());
118+
activityTaskHandler =
119+
new POJOActivityTaskHandler(testEnvironmentOptions.getDataConverter(), heartbeatExecutor);
114120
}
115121

116122
/**
@@ -138,8 +144,7 @@ public <T> T newActivityStub(Class<T> activityInterface) {
138144
ActivityOptions options =
139145
new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofDays(1)).build();
140146
InvocationHandler invocationHandler =
141-
ActivityInvocationHandler.newInstance(
142-
options, new TestActivityExecutor(activityTaskHandler));
147+
ActivityInvocationHandler.newInstance(options, new TestActivityExecutor(workflowService));
143148
invocationHandler = new DeterministicRunnerWrapper(invocationHandler);
144149
return ActivityInvocationHandler.newProxy(activityInterface, invocationHandler);
145150
}
@@ -150,12 +155,17 @@ public <T> void setActivityHeartbeatListener(Class<T> detailsClass, Consumer<T>
150155
activityHeartbetListener = new ClassConsumerPair(detailsClass, listener);
151156
}
152157

158+
@Override
159+
public void setWorkflowService(IWorkflowService workflowService) {
160+
this.workflowService = workflowService;
161+
}
162+
153163
private class TestActivityExecutor implements WorkflowInterceptor {
154164

155-
private final POJOActivityTaskHandler taskHandler;
165+
private final IWorkflowService workflowService;
156166

157-
TestActivityExecutor(POJOActivityTaskHandler activityTaskHandler) {
158-
this.taskHandler = activityTaskHandler;
167+
TestActivityExecutor(IWorkflowService workflowService) {
168+
this.workflowService = workflowService;
159169
}
160170

161171
@Override
@@ -175,7 +185,7 @@ public <T> Promise<T> executeActivity(
175185
.setWorkflowId("test-workflow-id")
176186
.setRunId(UUID.randomUUID().toString()));
177187
task.setActivityType(new ActivityType().setName(activityType));
178-
IWorkflowService service = new WorkflowServiceWrapper(null);
188+
IWorkflowService service = new WorkflowServiceWrapper(workflowService);
179189
Result taskResult =
180190
activityTaskHandler.handle(
181191
service, testEnvironmentOptions.getDomain(), task, NoopScope.getInstance());

src/main/java/com/uber/cadence/testing/TestActivityEnvironment.java

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.common.annotations.VisibleForTesting;
2121
import com.uber.cadence.internal.sync.TestActivityEnvironmentInternal;
22+
import com.uber.cadence.serviceclient.IWorkflowService;
2223
import java.util.function.Consumer;
2324

2425
/**
@@ -85,4 +86,6 @@ static TestActivityEnvironment newInstance(TestEnvironmentOptions options) {
8586
* @param <T> Type of the heartbeat details.
8687
*/
8788
<T> void setActivityHeartbeatListener(Class<T> detailsClass, Consumer<T> listener);
89+
90+
void setWorkflowService(IWorkflowService workflowService);
8891
}

0 commit comments

Comments
 (0)