Skip to content

Commit bff9398

Browse files
committed
missed TaskCounter.java, pull in Spencer's changes temporalio#2820 to anticipate merge conflict
1 parent 9d9e294 commit bff9398

12 files changed

Lines changed: 93 additions & 51 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Optional;
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeUnit;
29-
import java.util.concurrent.atomic.AtomicBoolean;
3029
import javax.annotation.Nonnull;
3130
import org.slf4j.Logger;
3231
import org.slf4j.LoggerFactory;
@@ -51,7 +50,7 @@ final class ActivityWorker implements SuspendableWorker {
5150
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
5251
private final TaskCounter taskCounter = new TaskCounter();
5352
private final PollerTracker pollerTracker;
54-
private final AtomicBoolean serverSupportsAutoscaling;
53+
private final NamespaceCapabilities namespaceCapabilities;
5554

5655
public ActivityWorker(
5756
@Nonnull WorkflowServiceStubs service,
@@ -61,7 +60,7 @@ public ActivityWorker(
6160
@Nonnull SingleWorkerOptions options,
6261
@Nonnull ActivityTaskHandler handler,
6362
@Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier,
64-
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
63+
@Nonnull NamespaceCapabilities namespaceCapabilities) {
6564
this.service = Objects.requireNonNull(service);
6665
this.namespace = Objects.requireNonNull(namespace);
6766
this.taskQueue = Objects.requireNonNull(taskQueue);
@@ -78,7 +77,7 @@ public ActivityWorker(
7877

7978
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
8079
this.pollerTracker = new PollerTracker();
81-
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
80+
this.namespaceCapabilities = namespaceCapabilities;
8281
}
8382

8483
@Override
@@ -114,7 +113,7 @@ public boolean start() {
114113
pollerTracker),
115114
this.pollTaskExecutor,
116115
pollerOptions,
117-
serverSupportsAutoscaling.get(),
116+
namespaceCapabilities.isPollerAutoscaling(),
118117
workerMetricsScope);
119118

120119
} else {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.temporal.internal.worker;
2+
3+
import java.util.concurrent.atomic.AtomicBoolean;
4+
5+
/**
6+
* Holds namespace-level capabilities discovered from the server's DescribeNamespace response. A
7+
* single instance is shared across all workers in a WorkerFactory and is populated at startup. Uses
8+
* AtomicBooleans so capabilities can be set after construction.
9+
*/
10+
public final class NamespaceCapabilities {
11+
private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false);
12+
private final AtomicBoolean workerHeartbeats = new AtomicBoolean(false);
13+
14+
public boolean isPollerAutoscaling() {
15+
return pollerAutoscaling.get();
16+
}
17+
18+
public void setPollerAutoscaling(boolean value) {
19+
pollerAutoscaling.set(value);
20+
}
21+
22+
public boolean isWorkerHeartbeats() {
23+
return workerHeartbeats.get();
24+
}
25+
26+
public void setWorkerHeartbeats(boolean value) {
27+
workerHeartbeats.set(value);
28+
}
29+
}

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.TimeoutException;
33-
import java.util.concurrent.atomic.AtomicBoolean;
3433
import javax.annotation.Nonnull;
3534
import org.slf4j.Logger;
3635
import org.slf4j.LoggerFactory;
@@ -53,7 +52,7 @@ final class NexusWorker implements SuspendableWorker {
5352
private final GrpcRetryer grpcRetryer;
5453
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
5554
private final TrackingSlotSupplier<NexusSlotInfo> slotSupplier;
56-
private final AtomicBoolean serverSupportsAutoscaling;
55+
private final NamespaceCapabilities namespaceCapabilities;
5756
private final boolean forceOldFailureFormat;
5857
private final TaskCounter taskCounter = new TaskCounter();
5958
private final PollerTracker pollerTracker = new PollerTracker();
@@ -66,7 +65,7 @@ public NexusWorker(
6665
@Nonnull NexusTaskHandler handler,
6766
@Nonnull DataConverter dataConverter,
6867
@Nonnull SlotSupplier<NexusSlotInfo> slotSupplier,
69-
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
68+
@Nonnull NamespaceCapabilities namespaceCapabilities) {
7069
this.service = Objects.requireNonNull(service);
7170
this.namespace = Objects.requireNonNull(namespace);
7271
this.taskQueue = Objects.requireNonNull(taskQueue);
@@ -82,7 +81,7 @@ public NexusWorker(
8281
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
8382

8483
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
85-
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
84+
this.namespaceCapabilities = namespaceCapabilities;
8685
// Allow tests to force old format for backward compatibility testing
8786
String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat");
8887
this.forceOldFailureFormat = "true".equalsIgnoreCase(forceOldFormat);
@@ -119,7 +118,7 @@ public boolean start() {
119118
pollerTracker),
120119
this.pollTaskExecutor,
121120
pollerOptions,
122-
serverSupportsAutoscaling.get(),
121+
namespaceCapabilities.isPollerAutoscaling(),
123122
workerMetricsScope);
124123
} else {
125124
poller =

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.concurrent.Executors;
1212
import java.util.concurrent.ScheduledExecutorService;
1313
import java.util.concurrent.TimeUnit;
14-
import java.util.concurrent.atomic.AtomicBoolean;
1514
import javax.annotation.Nonnull;
1615
import org.slf4j.Logger;
1716
import org.slf4j.LoggerFactory;
@@ -35,7 +34,7 @@ public SyncActivityWorker(
3534
double taskQueueActivitiesPerSecond,
3635
SingleWorkerOptions options,
3736
SlotSupplier<ActivitySlotInfo> slotSupplier,
38-
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
37+
@Nonnull NamespaceCapabilities namespaceCapabilities) {
3938
this.identity = options.getIdentity();
4039
this.namespace = namespace;
4140
this.taskQueue = taskQueue;
@@ -76,7 +75,7 @@ public SyncActivityWorker(
7675
options,
7776
taskHandler,
7877
slotSupplier,
79-
serverSupportsAutoscaling);
78+
namespaceCapabilities);
8079
}
8180

8281
public void registerActivityImplementations(Object... activitiesImplementation) {

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import io.temporal.worker.tuning.SlotSupplier;
77
import java.util.concurrent.CompletableFuture;
88
import java.util.concurrent.TimeUnit;
9-
import java.util.concurrent.atomic.AtomicBoolean;
109
import javax.annotation.Nonnull;
1110
import org.slf4j.Logger;
1211
import org.slf4j.LoggerFactory;
@@ -26,7 +25,7 @@ public SyncNexusWorker(
2625
String taskQueue,
2726
SingleWorkerOptions options,
2827
SlotSupplier<NexusSlotInfo> slotSupplier,
29-
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
28+
@Nonnull NamespaceCapabilities namespaceCapabilities) {
3029
this.identity = options.getIdentity();
3130
this.namespace = namespace;
3231
this.taskQueue = taskQueue;
@@ -47,7 +46,7 @@ public SyncNexusWorker(
4746
taskHandler,
4847
options.getDataConverter(),
4948
slotSupplier,
50-
serverSupportsAutoscaling);
49+
namespaceCapabilities);
5150
}
5251

5352
@Override

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Objects;
2929
import java.util.Optional;
3030
import java.util.concurrent.*;
31-
import java.util.concurrent.atomic.AtomicBoolean;
3231
import java.util.function.Supplier;
3332
import javax.annotation.Nonnull;
3433
import javax.annotation.Nullable;
@@ -76,7 +75,7 @@ public SyncWorkflowWorker(
7675
@Nonnull EagerActivityDispatcher eagerActivityDispatcher,
7776
@Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier,
7877
@Nonnull SlotSupplier<LocalActivitySlotInfo> laSlotSupplier,
79-
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
78+
@Nonnull NamespaceCapabilities namespaceCapabilities) {
8079
this.identity = singleWorkerOptions.getIdentity();
8180
this.namespace = namespace;
8281
this.taskQueue = taskQueue;
@@ -133,7 +132,7 @@ public SyncWorkflowWorker(
133132
taskHandler,
134133
eagerActivityDispatcher,
135134
slotSupplier,
136-
serverSupportsAutoscaling);
135+
namespaceCapabilities);
137136

138137
// Exists to support Worker#replayWorkflowExecution functionality.
139138
// This handler has to be non-sticky to avoid evicting actual executions from the cache
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.temporal.internal.worker;
2+
3+
import java.util.concurrent.atomic.AtomicInteger;
4+
5+
/** Tracks total processed and failed task counts for a worker. */
6+
public final class TaskCounter {
7+
private final AtomicInteger totalProcessed = new AtomicInteger();
8+
private final AtomicInteger totalFailed = new AtomicInteger();
9+
10+
void recordProcessed() {
11+
totalProcessed.incrementAndGet();
12+
}
13+
14+
void recordFailed() {
15+
totalFailed.incrementAndGet();
16+
}
17+
18+
public int getTotalProcessed() {
19+
return totalProcessed.get();
20+
}
21+
22+
public int getTotalFailed() {
23+
return totalFailed.get();
24+
}
25+
}

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.concurrent.CompletableFuture;
3434
import java.util.concurrent.RejectedExecutionException;
3535
import java.util.concurrent.TimeUnit;
36-
import java.util.concurrent.atomic.AtomicBoolean;
3736
import java.util.function.Supplier;
3837
import javax.annotation.Nonnull;
3938
import javax.annotation.Nullable;
@@ -66,7 +65,7 @@ final class WorkflowWorker implements SuspendableWorker {
6665
private final TaskCounter taskCounter = new TaskCounter();
6766
private final PollerTracker pollerTracker = new PollerTracker();
6867
private final PollerTracker stickyPollerTracker = new PollerTracker();
69-
private final AtomicBoolean serverSupportsAutoscaling;
68+
private final NamespaceCapabilities namespaceCapabilities;
7069

7170
private PollTaskExecutor<WorkflowTask> pollTaskExecutor;
7271

@@ -89,7 +88,7 @@ public WorkflowWorker(
8988
@Nonnull WorkflowTaskHandler handler,
9089
@Nonnull EagerActivityDispatcher eagerActivityDispatcher,
9190
@Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier,
92-
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
91+
@Nonnull NamespaceCapabilities namespaceCapabilities) {
9392
this.service = Objects.requireNonNull(service);
9493
this.namespace = Objects.requireNonNull(namespace);
9594
this.taskQueue = Objects.requireNonNull(taskQueue);
@@ -106,7 +105,7 @@ public WorkflowWorker(
106105
this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
107106
this.eagerActivityDispatcher = eagerActivityDispatcher;
108107
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
109-
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
108+
this.namespaceCapabilities = namespaceCapabilities;
110109
}
111110

112111
@Override
@@ -176,7 +175,7 @@ public boolean start() {
176175
pollers,
177176
this.pollTaskExecutor,
178177
pollerOptions,
179-
serverSupportsAutoscaling.get(),
178+
namespaceCapabilities.isPollerAutoscaling(),
180179
workerMetricsScope);
181180
} else {
182181
PollerBehaviorSimpleMaximum pollerBehavior =

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private static final class TaskSnapshot {
104104
WorkflowThreadExecutor workflowThreadExecutor,
105105
List<ContextPropagator> contextPropagators,
106106
@Nonnull List<WorkerPlugin> plugins,
107-
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
107+
@Nonnull NamespaceCapabilities namespaceCapabilities) {
108108

109109
Objects.requireNonNull(client, "client should not be null");
110110
this.plugins = Objects.requireNonNull(plugins, "plugins should not be null");
@@ -140,7 +140,7 @@ private static final class TaskSnapshot {
140140
this.options.getMaxTaskQueueActivitiesPerSecond(),
141141
activityOptions,
142142
activitySlotSupplier,
143-
serverSupportsAutoscaling);
143+
namespaceCapabilities);
144144
}
145145

146146
EagerActivityDispatcher eagerActivityDispatcher =
@@ -159,12 +159,7 @@ private static final class TaskSnapshot {
159159

160160
nexusWorker =
161161
new SyncNexusWorker(
162-
client,
163-
namespace,
164-
taskQueue,
165-
nexusOptions,
166-
nexusSlotSupplier,
167-
serverSupportsAutoscaling);
162+
client, namespace, taskQueue, nexusOptions, nexusSlotSupplier, namespaceCapabilities);
168163

169164
SingleWorkerOptions singleWorkerOptions =
170165
toWorkflowWorkerOptions(
@@ -205,7 +200,7 @@ private static final class TaskSnapshot {
205200
eagerActivityDispatcher,
206201
workflowSlotSupplier,
207202
localActivitySlotSupplier,
208-
serverSupportsAutoscaling);
203+
namespaceCapabilities);
209204
}
210205

211206
/**

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.temporal.internal.sync.WorkflowThreadExecutor;
1616
import io.temporal.internal.task.VirtualThreadDelegate;
1717
import io.temporal.internal.worker.HeartbeatManager;
18+
import io.temporal.internal.worker.NamespaceCapabilities;
1819
import io.temporal.internal.worker.ShutdownManager;
1920
import io.temporal.internal.worker.WorkflowExecutorCache;
2021
import io.temporal.internal.worker.WorkflowRunLockManager;
@@ -31,7 +32,6 @@
3132
import java.util.concurrent.SynchronousQueue;
3233
import java.util.concurrent.ThreadPoolExecutor;
3334
import java.util.concurrent.TimeUnit;
34-
import java.util.concurrent.atomic.AtomicBoolean;
3535
import java.util.concurrent.atomic.AtomicInteger;
3636
import java.util.function.BiConsumer;
3737
import java.util.function.Consumer;
@@ -61,11 +61,10 @@ public final class WorkerFactory {
6161
/** Plugins propagated from the client and applied to this factory. */
6262
private final List<WorkerPlugin> plugins;
6363

64-
/** Set during start() if the namespace has the poller_autoscaling capability. */
65-
private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false);
64+
/** Namespace capabilities populated during start() from DescribeNamespace response. */
65+
private final NamespaceCapabilities namespaceCapabilities = new NamespaceCapabilities();
6666

6767
private State state = State.Initial;
68-
private boolean heartbeatsSupported;
6968

7069
private final String statusErrorMessage =
7170
"attempted to %s while in %s state. Acceptable States: %s";
@@ -202,7 +201,7 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) {
202201
workflowThreadExecutor,
203202
workflowClient.getOptions().getContextPropagators(),
204203
plugins,
205-
pollerAutoscaling);
204+
namespaceCapabilities);
206205
workers.put(taskQueue, worker);
207206

208207
// Go through the plugins to call plugin initializeWorker hooks (e.g. register workflows,
@@ -269,17 +268,16 @@ public synchronized void start() {
269268
DescribeNamespaceRequest.newBuilder()
270269
.setNamespace(workflowClient.getOptions().getNamespace())
271270
.build());
272-
boolean heartbeatsSupported =
273-
describeNamespaceResponse.getNamespaceInfo().getCapabilities().getWorkerHeartbeats();
274-
if (!heartbeatsSupported) {
271+
if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getWorkerHeartbeats()) {
272+
namespaceCapabilities.setWorkerHeartbeats(true);
273+
} else {
275274
log.debug(
276275
"Server does not support worker heartbeats for namespace {}",
277276
workflowClient.getOptions().getNamespace());
278277
}
279-
this.heartbeatsSupported = heartbeatsSupported;
280278

281279
if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getPollerAutoscaling()) {
282-
pollerAutoscaling.set(true);
280+
namespaceCapabilities.setPollerAutoscaling(true);
283281
}
284282

285283
// Build plugin execution chain (reverse order for proper nesting)
@@ -316,7 +314,7 @@ private void doStart() {
316314
// Register heartbeat callbacks after workers are started.
317315
WorkflowClientInternal clientInternal = (WorkflowClientInternal) workflowClient.getInternal();
318316
HeartbeatManager hbManager = clientInternal.getHeartbeatManager();
319-
if (hbManager != null && heartbeatsSupported) {
317+
if (hbManager != null && namespaceCapabilities.isWorkerHeartbeats()) {
320318
String namespace = workflowClient.getOptions().getNamespace();
321319
String workerGroupingKey = clientInternal.getWorkerGroupingKey();
322320
for (Worker worker : workers.values()) {
@@ -447,7 +445,7 @@ private void doShutdown(boolean interruptUserTasks) {
447445
r -> {
448446
// Unregister workers from heartbeat manager only after full shutdown,
449447
// so heartbeats continue reporting SHUTTING_DOWN until the worker is fully stopped.
450-
if (heartbeatsSupported) {
448+
if (namespaceCapabilities.isWorkerHeartbeats()) {
451449
HeartbeatManager hbManager =
452450
((WorkflowClientInternal) workflowClient.getInternal()).getHeartbeatManager();
453451
if (hbManager != null) {

0 commit comments

Comments
 (0)