Skip to content

Commit f3dae0c

Browse files
authored
Handle Self Eviction - enable sticky by default (#219)
1 parent c606a13 commit f3dae0c

25 files changed

+316
-164
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ googleJavaFormat {
3737
}
3838

3939
group = 'com.uber.cadence'
40-
version = '2.0.0'
40+
version = '2.1.0'
4141

4242
description = """Uber Cadence Java Client"""
4343

src/main/java/com/uber/cadence/internal/common/AsyncBackoffThrottler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public AsyncBackoffThrottler(
9191

9292
private long calculateSleepTime() {
9393
double sleepMillis =
94-
(Math.pow(backoffCoefficient, failureCount.get() - 1)) * initialSleep.toMillis();
94+
Math.pow(backoffCoefficient, failureCount.get() - 1) * initialSleep.toMillis();
9595
if (maxSleep != null) {
9696
return Math.min((long) sleepMillis, maxSleep.toMillis());
9797
}

src/main/java/com/uber/cadence/internal/common/BackoffThrottler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public BackoffThrottler(Duration initialSleep, Duration maxSleep, double backoff
7878

7979
private long calculateSleepTime() {
8080
double sleepMillis =
81-
(Math.pow(backoffCoefficient, failureCount.get() - 1)) * initialSleep.toMillis();
81+
Math.pow(backoffCoefficient, failureCount.get() - 1) * initialSleep.toMillis();
8282
if (maxSleep != null) {
8383
return Math.min((long) sleepMillis, maxSleep.toMillis());
8484
}

src/main/java/com/uber/cadence/internal/metrics/MetricsType.java

+2
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,6 @@ public class MetricsType {
140140
CADENCE_METRICS_PREFIX + "sticky-cache-thread-forced-eviction";
141141
public static final String STICKY_CACHE_STALL = CADENCE_METRICS_PREFIX + "sticky-cache-stall";
142142
public static final String STICKY_CACHE_SIZE = CADENCE_METRICS_PREFIX + "sticky-cache-size";
143+
public static final String WORKFLOW_ACTIVE_THREAD_COUNT =
144+
CADENCE_METRICS_PREFIX + "workflow_active_thread_count";
143145
}

src/main/java/com/uber/cadence/internal/replay/Decider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ public interface Decider {
2929
byte[] query(PollForDecisionTaskResponse decisionTask, WorkflowQuery query) throws Throwable;
3030

3131
void close();
32-
}
32+
}

src/main/java/com/uber/cadence/internal/replay/DeciderCache.java

+33-46
Original file line numberDiff line numberDiff line change
@@ -21,47 +21,45 @@
2121
import com.google.common.cache.CacheBuilder;
2222
import com.google.common.cache.CacheLoader;
2323
import com.google.common.cache.LoadingCache;
24-
import com.google.common.cache.Weigher;
2524
import com.uber.cadence.PollForDecisionTaskResponse;
2625
import com.uber.cadence.internal.common.ThrowableFunc1;
2726
import com.uber.cadence.internal.metrics.MetricsType;
2827
import com.uber.m3.tally.Scope;
28+
import java.util.Iterator;
2929
import java.util.Objects;
3030
import java.util.Random;
31-
import java.util.UUID;
31+
import java.util.Set;
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.locks.Lock;
3434
import java.util.concurrent.locks.ReentrantLock;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3537

3638
public final class DeciderCache {
37-
private final String evictionEntryId = UUID.randomUUID().toString();
38-
private final int maxCacheSize;
3939
private final Scope metricsScope;
40-
private LoadingCache<String, WeightedCacheEntry<Decider>> cache;
40+
private LoadingCache<String, Decider> cache;
4141
private Lock evictionLock = new ReentrantLock();
4242
Random rand = new Random();
4343

44+
private static final Logger log = LoggerFactory.getLogger(DeciderCache.class);
45+
4446
public DeciderCache(int maxCacheSize, Scope scope) {
4547
Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0");
46-
this.maxCacheSize = maxCacheSize;
4748
this.metricsScope = Objects.requireNonNull(scope);
4849
this.cache =
4950
CacheBuilder.newBuilder()
50-
.maximumWeight(maxCacheSize)
51-
.concurrencyLevel(1)
52-
.weigher(
53-
(Weigher<String, WeightedCacheEntry<Decider>>) (key, value) -> value.getWeight())
51+
.maximumSize(maxCacheSize)
5452
.removalListener(
5553
e -> {
56-
Decider entry = e.getValue().entry;
54+
Decider entry = (Decider) e.getValue();
5755
if (entry != null) {
5856
entry.close();
5957
}
6058
})
6159
.build(
62-
new CacheLoader<String, WeightedCacheEntry<Decider>>() {
60+
new CacheLoader<String, Decider>() {
6361
@Override
64-
public WeightedCacheEntry<Decider> load(String key) {
62+
public Decider load(String key) {
6563
return null;
6664
}
6765
});
@@ -75,16 +73,14 @@ public Decider getOrCreate(
7573
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
7674
if (isFullHistory(decisionTask)) {
7775
invalidate(decisionTask);
78-
return cache.get(
79-
runId, () -> new WeightedCacheEntry<>(createReplayDecider.apply(decisionTask), 1))
80-
.entry;
76+
return cache.get(runId, () -> createReplayDecider.apply(decisionTask));
8177
}
8278
return getUnchecked(runId);
8379
}
8480

8581
public Decider getUnchecked(String runId) throws Exception {
8682
try {
87-
Decider cachedDecider = cache.getUnchecked(runId).entry;
83+
Decider cachedDecider = cache.getUnchecked(runId);
8884
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
8985
return cachedDecider;
9086
} catch (CacheLoader.InvalidCacheLoadException e) {
@@ -93,22 +89,32 @@ public Decider getUnchecked(String runId) throws Exception {
9389
}
9490
}
9591

96-
public void evictNext() throws InterruptedException {
92+
public void evictAny(String runId) throws InterruptedException {
9793
// Timeout is to guard against workflows trying to evict each other.
9894
if (!evictionLock.tryLock(rand.nextInt(4), TimeUnit.SECONDS)) {
9995
return;
10096
}
10197
try {
10298
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
103-
int remainingSpace = (int) (maxCacheSize - cache.size());
104-
// Force eviction to happen. This assumes a concurrency level of 1 which implies a single
105-
// underlying segment and lock. If higher concurrency levels are assumed this may not work
106-
// since
107-
// the weight could be greater than the segment size and put will simply noop.
108-
// ConcurrenyLevel limits cache modification but reads and cache loading computations still
109-
// have concurrently.
110-
cache.put(evictionEntryId, new WeightedCacheEntry<>(null, remainingSpace + 1));
111-
invalidate(evictionEntryId);
99+
Set<String> set = cache.asMap().keySet();
100+
if (set.isEmpty()) {
101+
return;
102+
}
103+
Iterator<String> iter = cache.asMap().keySet().iterator();
104+
String key = "";
105+
while (iter.hasNext()) {
106+
key = iter.next();
107+
if (!key.equals(runId)) {
108+
break;
109+
}
110+
}
111+
112+
if (key.equals(runId)) {
113+
log.warn(String.format("%s attempted to self evict. Ignoring eviction", runId));
114+
return;
115+
}
116+
cache.invalidate(key);
117+
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
112118
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
113119
} finally {
114120
evictionLock.unlock();
@@ -146,25 +152,6 @@ public void invalidateAll() {
146152
cache.invalidateAll();
147153
}
148154

149-
// Used for eviction
150-
private static class WeightedCacheEntry<T> {
151-
private T entry;
152-
private int weight;
153-
154-
private WeightedCacheEntry(T entry, int weight) {
155-
this.entry = entry;
156-
this.weight = weight;
157-
}
158-
159-
public T getEntry() {
160-
return entry;
161-
}
162-
163-
public int getWeight() {
164-
return weight;
165-
}
166-
}
167-
168155
public static class EvictedException extends Exception {
169156

170157
public EvictedException(String runId) {

src/main/java/com/uber/cadence/internal/replay/DecisionId.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public boolean equals(Object o) {
4343
if (this == o) {
4444
return true;
4545
}
46-
if (o == null || getClass() != o.getClass()) {
46+
if (o == null || !(o instanceof DecisionId)) {
4747
return false;
4848
}
4949

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.converter.DataConverter;
2323
import com.uber.cadence.converter.JsonDataConverter;
2424
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
25+
import com.uber.cadence.internal.metrics.NoopScope;
2526
import com.uber.cadence.internal.replay.ContinueAsNewWorkflowExecutionParameters;
2627
import com.uber.cadence.internal.replay.DeciderCache;
2728
import com.uber.cadence.internal.replay.DecisionContext;
@@ -290,12 +291,14 @@ public void close() {
290291
List<Future<?>> threadFutures = new ArrayList<>();
291292
lock.lock();
292293
if (closed) {
294+
lock.unlock();
293295
return;
294296
}
295297
// Do not close while runUntilAllBlocked executes.
296298
// closeRequested tells it to call close() at the end.
297299
closeRequested = true;
298300
if (inRunUntilAllBlocked) {
301+
lock.unlock();
299302
return;
300303
}
301304
try {
@@ -343,10 +346,10 @@ public void close() {
343346

344347
@Override
345348
public String stackTrace() {
346-
lock.lock();
347-
checkClosed();
348349
StringBuilder result = new StringBuilder();
350+
lock.lock();
349351
try {
352+
checkClosed();
350353
for (WorkflowThread coroutine : threads) {
351354
if (result.length() > 0) {
352355
result.append("\n");
@@ -415,8 +418,8 @@ WorkflowThread newThread(Runnable runnable, boolean detached, String name) {
415418
@Override
416419
public void executeInWorkflowThread(String name, Runnable runnable) {
417420
lock.lock();
418-
checkClosed();
419421
try {
422+
checkClosed();
420423
toExecuteInWorkflowThread.add(new NamedRunnable(name, runnable));
421424
} finally {
422425
lock.unlock();
@@ -594,7 +597,7 @@ public Random newRandom() {
594597

595598
@Override
596599
public Scope getMetricsScope() {
597-
throw new UnsupportedOperationException("not implemented");
600+
return NoopScope.getInstance();
598601
}
599602

600603
@Override

src/main/java/com/uber/cadence/internal/sync/WorkflowRetryerInternal.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private static long calculateSleepTime(long attempt, RetryOptions options) {
164164
? DEFAULT_COEFFICIENT
165165
: options.getBackoffCoefficient();
166166
double sleepMillis =
167-
(Math.pow(backoffCoefficient, attempt - 1)) * options.getInitialInterval().toMillis();
167+
Math.pow(backoffCoefficient, attempt - 1) * options.getInitialInterval().toMillis();
168168
Duration maximumInterval = options.getMaximumInterval();
169169
if (maximumInterval == null) {
170170
return (long)

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import org.slf4j.LoggerFactory;
3535
import org.slf4j.MDC;
3636

37-
import static javafx.scene.input.KeyCode.T;
38-
3937
class WorkflowThreadImpl implements WorkflowThread {
4038

4139
/**
@@ -228,6 +226,11 @@ public void start() {
228226
}
229227
context.setStatus(Status.RUNNING);
230228

229+
getDecisionContext()
230+
.getMetricsScope()
231+
.gauge(MetricsType.STICKY_CACHE_SIZE)
232+
.update(((ThreadPoolExecutor) threadPool).getActiveCount());
233+
231234
try {
232235
taskFuture = threadPool.submit(task);
233236
return;
@@ -238,7 +241,7 @@ public void start() {
238241
.inc(1);
239242
try {
240243
if (cache != null) {
241-
cache.evictNext();
244+
cache.evictAny(this.runner.getDecisionContext().getContext().getRunId());
242245
}
243246
} catch (InterruptedException e1) {
244247
log.warn("Unable to evict cache", e1);
@@ -348,13 +351,13 @@ public Future<?> stopNow() {
348351
throw new RuntimeException(
349352
"Couldn't destroy the thread. " + "The blocked thread stack trace: " + getStackTrace());
350353
}
351-
if(taskFuture == null){
352-
return getCompletedFuture();
354+
if (taskFuture == null) {
355+
return getCompletedFuture();
353356
}
354357
return taskFuture;
355358
}
356359

357-
private Future<?> getCompletedFuture(){
360+
private Future<?> getCompletedFuture() {
358361
CompletableFuture<String> f = new CompletableFuture<>();
359362
f.complete("done");
360363
return f;

src/main/java/com/uber/cadence/internal/testservice/StateMachine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public boolean equals(Object o) {
6969
if (this == o) {
7070
return true;
7171
}
72-
if (o == null || getClass() != o.getClass()) {
72+
if (o == null || !(o instanceof Transition)) {
7373
return false;
7474
}
7575

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ private void processStartTimer(
697697
throws BadRequestError, InternalServiceError {
698698
String timerId = a.getTimerId();
699699
if (timerId == null) {
700-
throw new BadRequestError(("A valid TimerId is not set on StartTimerDecision"));
700+
throw new BadRequestError("A valid TimerId is not set on StartTimerDecision");
701701
}
702702
StateMachine<TimerData> timer = timers.get(timerId);
703703
if (timer != null) {

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStore.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public boolean equals(Object o) {
6161
if (this == o) {
6262
return true;
6363
}
64-
if (o == null || getClass() != o.getClass()) {
64+
65+
if (o == null || !(o instanceof TaskListId)) {
6566
return false;
6667
}
6768

src/main/java/com/uber/cadence/internal/testservice/WorkflowId.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public boolean equals(Object o) {
4242
if (this == o) {
4343
return true;
4444
}
45-
if (o == null || getClass() != o.getClass()) {
45+
if (o == null || !(o instanceof WorkflowId)) {
4646
return false;
4747
}
4848

src/main/java/com/uber/cadence/internal/worker/ExecutorThreadFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class ExecutorThreadFactory implements ThreadFactory {
3535
@Override
3636
public Thread newThread(Runnable r) {
3737
Thread result = new Thread(r);
38-
result.setName(threadPrefix + ": " + (threadIndex.incrementAndGet()));
38+
result.setName(threadPrefix + ": " + threadIndex.incrementAndGet());
3939
result.setUncaughtExceptionHandler(uncaughtExceptionHandler);
4040
return result;
4141
}

src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcherFactory.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
public final class PollDecisionTaskDispatcherFactory
2626
implements DispatcherFactory<String, PollForDecisionTaskResponse> {
27-
private IWorkflowService service = new WorkflowServiceTChannel();
27+
private IWorkflowService service;
2828
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
2929

3030
public PollDecisionTaskDispatcherFactory(
@@ -37,8 +37,6 @@ public PollDecisionTaskDispatcherFactory(IWorkflowService service) {
3737
this.service = Objects.requireNonNull(service);
3838
}
3939

40-
public PollDecisionTaskDispatcherFactory() {}
41-
4240
@Override
4341
public Dispatcher<String, PollForDecisionTaskResponse> create() {
4442
return new PollDecisionTaskDispatcher(service, uncaughtExceptionHandler);

src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java

+4
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ public TestEnvironmentOptions build() {
9292
metricsScope = NoopScope.getInstance();
9393
}
9494

95+
if (factoryOptions == null) {
96+
factoryOptions = new Worker.FactoryOptions.Builder().Build();
97+
}
98+
9599
return new TestEnvironmentOptions(
96100
dataConverter,
97101
domain,

0 commit comments

Comments
 (0)