Skip to content

Commit b35bde2

Browse files
committed
Review comments
1 parent 08e73bc commit b35bde2

4 files changed

Lines changed: 17 additions & 23 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
6565
// When graceful poll shutdown is enabled, the server will complete outstanding polls with
6666
// empty responses after ShutdownWorker is called. We simply wait for polls to return.
6767
pollExecutorShutdown =
68-
shutdownManager.shutdownExecutorUntimed(pollExecutor, this + "#pollExecutor");
68+
shutdownManager.shutdownExecutor(
69+
pollExecutor, this + "#pollExecutor", Duration.ofSeconds(80));
6970
} else {
7071
// Old behaviour forcibly stops outstanding polls.
7172
pollExecutorShutdown =

temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.temporal.internal.worker;
22

3+
import io.temporal.api.namespace.v1.NamespaceInfo.Capabilities;
34
import java.util.concurrent.atomic.AtomicBoolean;
45

56
/**
@@ -13,12 +14,17 @@ public final class NamespaceCapabilities {
1314
private final AtomicBoolean workerHeartbeats = new AtomicBoolean(false);
1415

1516

16-
public boolean isPollerAutoscaling() {
17-
return pollerAutoscaling.get();
17+
public void setFromCapabilities(Capabilities capabilities) {
18+
if (capabilities.getPollerAutoscaling()) {
19+
pollerAutoscaling.set(true);
20+
}
21+
if (capabilities.getWorkerPollCompleteOnShutdown()) {
22+
gracefulPollShutdown.set(true);
23+
}
1824
}
1925

20-
public void setPollerAutoscaling(boolean value) {
21-
pollerAutoscaling.set(value);
26+
public boolean isPollerAutoscaling() {
27+
return pollerAutoscaling.get();
2228
}
2329

2430
public boolean isGracefulPollShutdown() {

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -268,23 +268,8 @@ public synchronized void start() {
268268
DescribeNamespaceRequest.newBuilder()
269269
.setNamespace(workflowClient.getOptions().getNamespace())
270270
.build());
271-
if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getWorkerHeartbeats()) {
272-
namespaceCapabilities.setWorkerHeartbeats(true);
273-
} else {
274-
log.debug(
275-
"Server does not support worker heartbeats for namespace {}",
276-
workflowClient.getOptions().getNamespace());
277-
}
278-
279-
if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getPollerAutoscaling()) {
280-
namespaceCapabilities.setPollerAutoscaling(true);
281-
}
282-
if (describeNamespaceResponse
283-
.getNamespaceInfo()
284-
.getCapabilities()
285-
.getWorkerPollCompleteOnShutdown()) {
286-
namespaceCapabilities.setGracefulPollShutdown(true);
287-
}
271+
namespaceCapabilities.setFromCapabilities(
272+
describeNamespaceResponse.getNamespaceInfo().getCapabilities());
288273

289274
// Build plugin execution chain (reverse order for proper nesting)
290275
Consumer<WorkerFactory> startChain = WorkerFactory::doStart;

temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.junit.Assert.*;
44

55
import com.uber.m3.tally.NoopScope;
6+
import io.temporal.api.namespace.v1.NamespaceInfo.Capabilities;
67
import io.temporal.worker.tuning.PollerBehaviorSimpleMaximum;
78
import java.util.concurrent.CompletableFuture;
89
import java.util.concurrent.CountDownLatch;
@@ -30,7 +31,8 @@ public static Object[] data() {
3031
@Test(timeout = 10_000)
3132
public void inflightPollSurvivesShutdownOnlyWhenGraceful() throws Exception {
3233
NamespaceCapabilities capabilities = new NamespaceCapabilities();
33-
capabilities.setGracefulPollShutdown(graceful);
34+
capabilities.setFromCapabilities(
35+
Capabilities.newBuilder().setWorkerPollCompleteOnShutdown(true).build());
3436

3537
AtomicReference<String> processedTask = new AtomicReference<>();
3638
CountDownLatch taskProcessedLatch = new CountDownLatch(1);

0 commit comments

Comments
 (0)