Skip to content

Commit 4578791

Browse files
committed
PR Feedback, replace sleep with eventually, test against real server, use separate TaskCounter, get TaskQueueType live not just on start
1 parent bfd3fc6 commit 4578791

16 files changed

Lines changed: 641 additions & 632 deletions

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ jobs:
108108
--dynamic-config-value history.MaxBufferedQueryCount=10000 \
109109
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
110110
--dynamic-config-value history.enableRequestIdRefLinks=true \
111+
--dynamic-config-value frontend.ListWorkersEnabled=true \
111112
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' &
112113
sleep 10s
113114

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicBoolean;
30-
import java.util.concurrent.atomic.AtomicInteger;
3130
import javax.annotation.Nonnull;
3231
import org.slf4j.Logger;
3332
import org.slf4j.LoggerFactory;
@@ -50,8 +49,7 @@ final class ActivityWorker implements SuspendableWorker {
5049
private final GrpcRetryer grpcRetryer;
5150
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
5251
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
53-
private final AtomicInteger totalProcessedTasks = new AtomicInteger();
54-
private final AtomicInteger totalFailedTasks = new AtomicInteger();
52+
private final TaskCounter taskCounter = new TaskCounter();
5553
private final PollerTracker pollerTracker;
5654
private final AtomicBoolean serverSupportsAutoscaling;
5755

@@ -227,12 +225,8 @@ public TrackingSlotSupplier<ActivitySlotInfo> getSlotSupplier() {
227225
return slotSupplier;
228226
}
229227

230-
public AtomicInteger getTotalProcessedTasks() {
231-
return totalProcessedTasks;
232-
}
233-
234-
public AtomicInteger getTotalFailedTasks() {
235-
return totalFailedTasks;
228+
public TaskCounter getTaskCounter() {
229+
return taskCounter;
236230
}
237231

238232
public PollerOptions getPollerOptions() {
@@ -286,18 +280,22 @@ public void handle(ActivityTask task) throws Exception {
286280
MDC.put(LoggerTag.ATTEMPT, Integer.toString(pollResponse.getAttempt()));
287281

288282
ActivityTaskHandler.Result result = null;
283+
boolean taskFailed = false;
289284
try {
290285
result = handleActivity(task, metricsScope);
291-
totalProcessedTasks.incrementAndGet();
292286
if (result.getTaskFailed() != null
293287
&& !io.temporal.internal.common.FailureUtils.isBenignApplicationFailure(
294288
result.getTaskFailed().getFailure())) {
295-
totalFailedTasks.incrementAndGet();
289+
taskFailed = true;
296290
}
297291
} catch (Exception e) {
298-
totalFailedTasks.incrementAndGet();
292+
taskFailed = true;
299293
throw e;
300294
} finally {
295+
taskCounter.recordProcessed();
296+
if (taskFailed) {
297+
taskCounter.recordFailed();
298+
}
301299
MDC.remove(LoggerTag.ACTIVITY_ID);
302300
MDC.remove(LoggerTag.ACTIVITY_TYPE);
303301
MDC.remove(LoggerTag.WORKFLOW_ID);

temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.temporal.internal.worker;
22

3-
import com.google.common.base.Preconditions;
43
import io.temporal.api.worker.v1.WorkerHeartbeat;
54
import io.temporal.api.workflowservice.v1.RecordWorkerHeartbeatRequest;
65
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -61,11 +60,10 @@ public void registerWorker(
6160
public void unregisterWorker(String namespace, String workerInstanceKey) {
6261
synchronized (lock) {
6362
SharedNamespaceWorker nsWorker = namespaceWorkers.get(namespace);
64-
Preconditions.checkState(
65-
nsWorker != null,
66-
"unregisterWorker called for unknown namespace %s, worker %s",
67-
namespace,
68-
workerInstanceKey);
63+
if (nsWorker == null) {
64+
// Already cleaned up by shutdown()
65+
return;
66+
}
6967
nsWorker.unregisterWorker(workerInstanceKey);
7068
if (nsWorker.isEmpty()) {
7169
nsWorker.shutdown();

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Objects;
3131
import java.util.Optional;
3232
import java.util.concurrent.*;
33-
import java.util.concurrent.atomic.AtomicInteger;
3433
import javax.annotation.Nonnull;
3534
import javax.annotation.Nullable;
3635
import org.slf4j.Logger;
@@ -48,8 +47,7 @@ final class LocalActivityWorker implements Startable, Shutdownable {
4847

4948
private final LocalActivityDispatcherImpl laScheduler;
5049

51-
private final AtomicInteger totalProcessedTasks = new AtomicInteger();
52-
private final AtomicInteger totalFailedTasks = new AtomicInteger();
50+
private final TaskCounter taskCounter = new TaskCounter();
5351
private final PollerOptions pollerOptions;
5452
private final Scope workerMetricsScope;
5553

@@ -400,6 +398,7 @@ public void handle(LocalActivityAttemptTask attemptTask) throws Exception {
400398
LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
401399
executionContext.newAttempt();
402400
PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
401+
boolean taskFailed = false;
403402

404403
try {
405404
// if an activity was already completed by any mean like scheduleToClose or scheduleToStart,
@@ -476,21 +475,24 @@ public void handle(LocalActivityAttemptTask attemptTask) throws Exception {
476475
}
477476

478477
reason = handleResult(activityHandlerResult, attemptTask, metricsScope);
479-
totalProcessedTasks.incrementAndGet();
480478
if (activityHandlerResult.getTaskFailed() != null
481479
&& !io.temporal.internal.common.FailureUtils.isBenignApplicationFailure(
482480
activityHandlerResult.getTaskFailed().getFailure())) {
483-
totalFailedTasks.incrementAndGet();
481+
taskFailed = true;
484482
}
485483
} catch (Throwable ex) {
486484
// handleLocalActivity is expected to never throw an exception and return a result
487485
// that can be used for a workflow callback if this method throws, it's a bug.
488486
log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
489-
totalFailedTasks.incrementAndGet();
487+
taskFailed = true;
490488
executionContext.callback(
491489
processingFailed(activityTask.getActivityId(), activityTask.getAttempt(), ex));
492490
throw ex;
493491
} finally {
492+
taskCounter.recordProcessed();
493+
if (taskFailed) {
494+
taskCounter.recordFailed();
495+
}
494496
slotSupplier.releaseSlot(reason, executionContext.getPermit());
495497
MDC.remove(LoggerTag.ACTIVITY_ID);
496498
MDC.remove(LoggerTag.ACTIVITY_TYPE);
@@ -766,12 +768,8 @@ public LocalActivityDispatcher getLocalActivityScheduler() {
766768
return laScheduler;
767769
}
768770

769-
public AtomicInteger getTotalProcessedTasks() {
770-
return totalProcessedTasks;
771-
}
772-
773-
public AtomicInteger getTotalFailedTasks() {
774-
return totalFailedTasks;
771+
public TaskCounter getTaskCounter() {
772+
return taskCounter;
775773
}
776774

777775
private static Failure newTimeoutFailure(TimeoutType timeoutType, @Nullable Failure cause) {

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.TimeoutException;
3333
import java.util.concurrent.atomic.AtomicBoolean;
34-
import java.util.concurrent.atomic.AtomicInteger;
3534
import javax.annotation.Nonnull;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
@@ -56,8 +55,7 @@ final class NexusWorker implements SuspendableWorker {
5655
private final TrackingSlotSupplier<NexusSlotInfo> slotSupplier;
5756
private final AtomicBoolean serverSupportsAutoscaling;
5857
private final boolean forceOldFailureFormat;
59-
private final AtomicInteger totalProcessedTasks = new AtomicInteger();
60-
private final AtomicInteger totalFailedTasks = new AtomicInteger();
58+
private final TaskCounter taskCounter = new TaskCounter();
6159
private final PollerTracker pollerTracker = new PollerTracker();
6260

6361
public NexusWorker(
@@ -224,12 +222,8 @@ public TrackingSlotSupplier<NexusSlotInfo> getSlotSupplier() {
224222
return slotSupplier;
225223
}
226224

227-
public AtomicInteger getTotalProcessedTasks() {
228-
return totalProcessedTasks;
229-
}
230-
231-
public AtomicInteger getTotalFailedTasks() {
232-
return totalFailedTasks;
225+
public TaskCounter getTaskCounter() {
226+
return taskCounter;
233227
}
234228

235229
public PollerOptions getPollerOptions() {
@@ -297,16 +291,17 @@ public void handle(NexusTask task) {
297291
service, operation, taskQueue, options.getIdentity(), options.getBuildId()),
298292
task.getPermit());
299293

294+
boolean taskFailed = false;
300295
try {
301-
boolean handlerFailed = handleNexusTask(task, metricsScope);
302-
totalProcessedTasks.incrementAndGet();
303-
if (handlerFailed) {
304-
totalFailedTasks.incrementAndGet();
305-
}
296+
taskFailed = handleNexusTask(task, metricsScope);
306297
} catch (Throwable e) {
307-
totalFailedTasks.incrementAndGet();
298+
taskFailed = true;
308299
throw e;
309300
} finally {
301+
taskCounter.recordProcessed();
302+
if (taskFailed) {
303+
taskCounter.recordFailed();
304+
}
310305
task.getCompletionCallback().apply();
311306
MDC.remove(LoggerTag.NEXUS_SERVICE);
312307
MDC.remove(LoggerTag.NEXUS_OPERATION);

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import java.util.concurrent.ScheduledExecutorService;
1313
import java.util.concurrent.TimeUnit;
1414
import java.util.concurrent.atomic.AtomicBoolean;
15-
import java.util.concurrent.atomic.AtomicInteger;
1615
import javax.annotation.Nonnull;
1716
import org.slf4j.Logger;
1817
import org.slf4j.LoggerFactory;
@@ -156,12 +155,8 @@ public TrackingSlotSupplier<ActivitySlotInfo> getSlotSupplier() {
156155
return worker.getSlotSupplier();
157156
}
158157

159-
public AtomicInteger getTotalProcessedTasks() {
160-
return worker.getTotalProcessedTasks();
161-
}
162-
163-
public AtomicInteger getTotalFailedTasks() {
164-
return worker.getTotalFailedTasks();
158+
public TaskCounter getTaskCounter() {
159+
return worker.getTaskCounter();
165160
}
166161

167162
public PollerOptions getPollerOptions() {

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import java.util.concurrent.CompletableFuture;
88
import java.util.concurrent.TimeUnit;
99
import java.util.concurrent.atomic.AtomicBoolean;
10-
import java.util.concurrent.atomic.AtomicInteger;
1110
import javax.annotation.Nonnull;
1211
import org.slf4j.Logger;
1312
import org.slf4j.LoggerFactory;
@@ -103,12 +102,8 @@ public TrackingSlotSupplier<NexusSlotInfo> getSlotSupplier() {
103102
return worker.getSlotSupplier();
104103
}
105104

106-
public AtomicInteger getTotalProcessedTasks() {
107-
return worker.getTotalProcessedTasks();
108-
}
109-
110-
public AtomicInteger getTotalFailedTasks() {
111-
return worker.getTotalFailedTasks();
105+
public TaskCounter getTaskCounter() {
106+
return worker.getTaskCounter();
112107
}
113108

114109
public PollerOptions getPollerOptions() {

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Optional;
3030
import java.util.concurrent.*;
3131
import java.util.concurrent.atomic.AtomicBoolean;
32-
import java.util.concurrent.atomic.AtomicInteger;
3332
import java.util.function.Supplier;
3433
import javax.annotation.Nonnull;
3534
import javax.annotation.Nullable;
@@ -67,7 +66,7 @@ public SyncWorkflowWorker(
6766
@Nonnull String namespace,
6867
@Nonnull String taskQueue,
6968
@Nonnull String workerInstanceKey,
70-
@Nonnull List<TaskQueueType> activeTaskQueueTypes,
69+
@Nonnull Supplier<List<TaskQueueType>> activeTaskQueueTypesSupplier,
7170
@Nonnull SingleWorkerOptions singleWorkerOptions,
7271
@Nonnull SingleWorkerOptions localActivityOptions,
7372
@Nonnull WorkflowRunLockManager runLocks,
@@ -126,7 +125,7 @@ public SyncWorkflowWorker(
126125
namespace,
127126
taskQueue,
128127
workerInstanceKey,
129-
activeTaskQueueTypes,
128+
activeTaskQueueTypesSupplier,
130129
stickyTaskQueueName,
131130
singleWorkerOptions,
132131
runLocks,
@@ -260,20 +259,12 @@ public boolean hasStickyQueue() {
260259
return workflowWorker.hasStickyQueue();
261260
}
262261

263-
public AtomicInteger getWorkflowTotalProcessedTasks() {
264-
return workflowWorker.getTotalProcessedTasks();
262+
public TaskCounter getWorkflowTaskCounter() {
263+
return workflowWorker.getTaskCounter();
265264
}
266265

267-
public AtomicInteger getWorkflowTotalFailedTasks() {
268-
return workflowWorker.getTotalFailedTasks();
269-
}
270-
271-
public AtomicInteger getLocalActivityTotalProcessedTasks() {
272-
return laWorker.getTotalProcessedTasks();
273-
}
274-
275-
public AtomicInteger getLocalActivityTotalFailedTasks() {
276-
return laWorker.getTotalFailedTasks();
266+
public TaskCounter getLocalActivityTaskCounter() {
267+
return laWorker.getTaskCounter();
277268
}
278269

279270
public PollerOptions getWorkflowPollerOptions() {

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.concurrent.RejectedExecutionException;
3535
import java.util.concurrent.TimeUnit;
3636
import java.util.concurrent.atomic.AtomicBoolean;
37-
import java.util.concurrent.atomic.AtomicInteger;
3837
import java.util.function.Supplier;
3938
import javax.annotation.Nonnull;
4039
import javax.annotation.Nullable;
@@ -62,10 +61,9 @@ final class WorkflowWorker implements SuspendableWorker {
6261
private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
6362
private volatile Supplier<WorkerHeartbeat> heartbeatSupplier;
6463
private final String workerInstanceKey;
65-
private final List<TaskQueueType> activeTaskQueueTypes;
64+
private final Supplier<List<TaskQueueType>> activeTaskQueueTypesSupplier;
6665

67-
private final AtomicInteger totalProcessedTasks = new AtomicInteger();
68-
private final AtomicInteger totalFailedTasks = new AtomicInteger();
66+
private final TaskCounter taskCounter = new TaskCounter();
6967
private final PollerTracker pollerTracker = new PollerTracker();
7068
private final PollerTracker stickyPollerTracker = new PollerTracker();
7169
private final AtomicBoolean serverSupportsAutoscaling;
@@ -83,7 +81,7 @@ public WorkflowWorker(
8381
@Nonnull String namespace,
8482
@Nonnull String taskQueue,
8583
@Nonnull String workerInstanceKey,
86-
@Nonnull List<TaskQueueType> activeTaskQueueTypes,
84+
@Nonnull Supplier<List<TaskQueueType>> activeTaskQueueTypesSupplier,
8785
@Nullable String stickyTaskQueueName,
8886
@Nonnull SingleWorkerOptions options,
8987
@Nonnull WorkflowRunLockManager runLocks,
@@ -96,7 +94,7 @@ public WorkflowWorker(
9694
this.namespace = Objects.requireNonNull(namespace);
9795
this.taskQueue = Objects.requireNonNull(taskQueue);
9896
this.workerInstanceKey = Objects.requireNonNull(workerInstanceKey);
99-
this.activeTaskQueueTypes = Objects.requireNonNull(activeTaskQueueTypes);
97+
this.activeTaskQueueTypesSupplier = Objects.requireNonNull(activeTaskQueueTypesSupplier);
10098
this.options = Objects.requireNonNull(options);
10199
this.stickyTaskQueueName = stickyTaskQueueName;
102100
this.pollerOptions = getPollerOptions(options);
@@ -245,7 +243,7 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
245243
.setTaskQueue(taskQueue)
246244
.setWorkerInstanceKey(workerInstanceKey)
247245
.setReason(GRACEFUL_SHUTDOWN_MESSAGE)
248-
.addAllTaskQueueTypes(activeTaskQueueTypes);
246+
.addAllTaskQueueTypes(activeTaskQueueTypesSupplier.get());
249247
if (stickyTaskQueueName != null) {
250248
shutdownReq.setStickyTaskQueue(stickyTaskQueueName);
251249
}
@@ -382,12 +380,8 @@ public String getStickyTaskQueueName() {
382380
return stickyTaskQueueName;
383381
}
384382

385-
public AtomicInteger getTotalProcessedTasks() {
386-
return totalProcessedTasks;
387-
}
388-
389-
public AtomicInteger getTotalFailedTasks() {
390-
return totalFailedTasks;
383+
public TaskCounter getTaskCounter() {
384+
return taskCounter;
391385
}
392386

393387
public PollerOptions getPollerOptions() {
@@ -613,9 +607,9 @@ public void handle(WorkflowTask task) throws Exception {
613607
MDC.remove(LoggerTag.WORKFLOW_TYPE);
614608
MDC.remove(LoggerTag.RUN_ID);
615609

616-
totalProcessedTasks.incrementAndGet();
610+
taskCounter.recordProcessed();
617611
if (taskProcessingFailed) {
618-
totalFailedTasks.incrementAndGet();
612+
taskCounter.recordFailed();
619613
}
620614

621615
if (locked) {

0 commit comments

Comments
 (0)