Skip to content

Commit 1dc3b70

Browse files
committed
Always shutdown fully, query taskHandler instead of keeping bools
1 parent ff47c66 commit 1dc3b70

8 files changed

Lines changed: 166 additions & 38 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ jobs:
108108
--dynamic-config-value history.MaxBufferedQueryCount=10000 \
109109
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
110110
--dynamic-config-value history.enableRequestIdRefLinks=true \
111+
--dynamic-config-value frontend.WorkerHeartbeatsEnabled=true \
111112
--dynamic-config-value frontend.ListWorkersEnabled=true \
112113
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' &
113114
sleep 10s

temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public NexusTaskHandlerImpl(
6464
this.nexusServiceInterceptor = new TemporalInterceptorMiddleware(interceptors);
6565
}
6666

67+
public boolean isAnyTypeSupported() {
68+
return !serviceImplInstances.isEmpty();
69+
}
70+
6771
@Override
6872
public boolean start() {
6973
if (serviceImplInstances.isEmpty()) {

temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ boolean isShutdown() {
127127
}
128128

129129
void shutdown() {
130-
if (!shuttingDown.compareAndSet(false, true)) return;
130+
shuttingDown.set(true);
131131
scheduler.shutdownNow();
132132
try {
133133
scheduler.awaitTermination(5, TimeUnit.SECONDS);
@@ -139,30 +139,30 @@ void shutdown() {
139139
private void heartbeatTick() {
140140
if (callbacks.isEmpty()) return;
141141

142-
try {
143-
List<WorkerHeartbeat> heartbeats = new ArrayList<>();
144-
for (Map.Entry<String, Supplier<WorkerHeartbeat>> entry : callbacks.entrySet()) {
145-
try {
146-
heartbeats.add(entry.getValue().get());
147-
} catch (Exception e) {
148-
log.warn(
149-
"Failed to build heartbeat for worker {} in namespace {}",
150-
entry.getKey(),
151-
namespace,
152-
e);
153-
}
142+
List<WorkerHeartbeat> heartbeats = new ArrayList<>();
143+
for (Map.Entry<String, Supplier<WorkerHeartbeat>> entry : callbacks.entrySet()) {
144+
try {
145+
heartbeats.add(entry.getValue().get());
146+
} catch (Exception e) {
147+
log.warn(
148+
"Failed to build heartbeat for worker {} in namespace {}",
149+
entry.getKey(),
150+
namespace,
151+
e);
154152
}
153+
}
155154

156-
if (!heartbeats.isEmpty()) {
157-
service
158-
.blockingStub()
159-
.recordWorkerHeartbeat(
160-
RecordWorkerHeartbeatRequest.newBuilder()
161-
.setNamespace(namespace)
162-
.setIdentity(identity)
163-
.addAllWorkerHeartbeat(heartbeats)
164-
.build());
165-
}
155+
if (heartbeats.isEmpty()) return;
156+
157+
try {
158+
service
159+
.blockingStub()
160+
.recordWorkerHeartbeat(
161+
RecordWorkerHeartbeatRequest.newBuilder()
162+
.setNamespace(namespace)
163+
.setIdentity(identity)
164+
.addAllWorkerHeartbeat(heartbeats)
165+
.build());
166166
} catch (io.grpc.StatusRuntimeException e) {
167167
if (e.getStatus().getCode() == io.grpc.Status.Code.UNIMPLEMENTED) {
168168
log.warn(

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ public EagerActivityDispatcher getEagerActivityDispatcher() {
150150
return this.worker.getEagerActivityDispatcher();
151151
}
152152

153+
public boolean isAnyTypeSupported() {
154+
return taskHandler.isAnyTypeSupported();
155+
}
156+
153157
public TrackingSlotSupplier<ActivitySlotInfo> getSlotSupplier() {
154158
return worker.getSlotSupplier();
155159
}

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ public String toString() {
124124
"SyncNexusWorker{namespace=%s, taskQueue=%s, identity=%s}", namespace, taskQueue, identity);
125125
}
126126

127+
public boolean isAnyTypeSupported() {
128+
return taskHandler.isAnyTypeSupported();
129+
}
130+
127131
public void registerNexusServiceImplementation(Object... nexusServiceImplementations) {
128132
taskHandler.registerNexusServiceImplementations(nexusServiceImplementations);
129133
}

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ public boolean hasStickyQueue() {
258258
return workflowWorker.hasStickyQueue();
259259
}
260260

261+
public boolean isAnyTypeSupported() {
262+
return factory.isAnyTypeSupported();
263+
}
264+
261265
public TaskCounter getWorkflowTaskCounter() {
262266
return workflowWorker.getTaskCounter();
263267
}

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,11 @@ public final class Worker {
6565
final SyncNexusWorker nexusWorker;
6666
private final AtomicBoolean started = new AtomicBoolean();
6767
private volatile boolean shuttingDown = false;
68-
private boolean hasWorkflows = false;
69-
private boolean hasActivities = false;
70-
private boolean hasNexusServices = false;
7168
private final String workerInstanceKey = UUID.randomUUID().toString();
7269
private volatile Instant startTime;
7370
private final WorkflowClientOptions clientOptions;
7471
private final @Nonnull WorkflowExecutorCache cache;
75-
private final Map<String, TaskSnapshot> previousSnapshots = new ConcurrentHashMap<>();
72+
private final Map<String, TaskSnapshot> previousHeartbeatSnapshots = new ConcurrentHashMap<>();
7673

7774
private static final class TaskSnapshot {
7875
final int processed;
@@ -227,7 +224,6 @@ public void registerWorkflowImplementationTypes(Class<?>... workflowImplementati
227224

228225
workflowWorker.registerWorkflowImplementationTypes(
229226
WorkflowImplementationOptions.newBuilder().build(), workflowImplementationClasses);
230-
hasWorkflows = true;
231227
}
232228

233229
/**
@@ -252,7 +248,6 @@ public void registerWorkflowImplementationTypes(
252248
"registerWorkflowImplementationTypes is not allowed after worker has started");
253249

254250
workflowWorker.registerWorkflowImplementationTypes(options, workflowImplementationClasses);
255-
hasWorkflows = true;
256251
}
257252

258253
/**
@@ -354,7 +349,6 @@ public <R> void addWorkflowImplementationFactory(Class<R> workflowInterface, Fun
354349
public <R> void registerWorkflowImplementationFactory(
355350
Class<R> workflowInterface, Func<R> factory, WorkflowImplementationOptions options) {
356351
workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
357-
hasWorkflows = true;
358352
}
359353

360354
@VisibleForTesting
@@ -363,7 +357,6 @@ public <R> void registerWorkflowImplementationFactory(
363357
Functions.Func1<EncodedValues, R> factory,
364358
WorkflowImplementationOptions options) {
365359
workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
366-
hasWorkflows = true;
367360
}
368361

369362
/**
@@ -407,7 +400,6 @@ public <R> void registerWorkflowImplementationFactory(
407400
Class<R> workflowInterface, Func<R> factory) {
408401
workflowWorker.registerWorkflowImplementationFactory(
409402
WorkflowImplementationOptions.getDefaultInstance(), workflowInterface, factory);
410-
hasWorkflows = true;
411403
}
412404

413405
/**
@@ -434,7 +426,6 @@ public void registerActivitiesImplementations(Object... activityImplementations)
434426

435427
if (activityWorker != null) {
436428
activityWorker.registerActivityImplementations(activityImplementations);
437-
hasActivities = true;
438429
}
439430
workflowWorker.registerLocalActivityImplementations(activityImplementations);
440431
}
@@ -452,7 +443,6 @@ public void registerNexusServiceImplementation(Object... nexusServiceImplementat
452443
!started.get(),
453444
"registerNexusServiceImplementation is not allowed after worker has started");
454445
nexusWorker.registerNexusServiceImplementation(nexusServiceImplementations);
455-
hasNexusServices = true;
456446
}
457447

458448
void start() {
@@ -507,13 +497,13 @@ String getWorkerInstanceKey() {
507497

508498
List<TaskQueueType> getActiveTaskQueueTypes() {
509499
List<TaskQueueType> types = new ArrayList<>();
510-
if (hasWorkflows) {
500+
if (workflowWorker.isAnyTypeSupported()) {
511501
types.add(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW);
512502
}
513-
if (hasActivities) {
503+
if (activityWorker != null && activityWorker.isAnyTypeSupported()) {
514504
types.add(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY);
515505
}
516-
if (hasNexusServices) {
506+
if (nexusWorker.isAnyTypeSupported()) {
517507
types.add(TaskQueueType.TASK_QUEUE_TYPE_NEXUS);
518508
}
519509
return types;
@@ -630,7 +620,7 @@ private WorkerSlotsInfo buildSlotsInfo(
630620
int currentProcessed = taskCounter.getTotalProcessed();
631621
int currentFailed = taskCounter.getTotalFailed();
632622
TaskSnapshot previous =
633-
previousSnapshots.put(key, new TaskSnapshot(currentProcessed, currentFailed));
623+
previousHeartbeatSnapshots.put(key, new TaskSnapshot(currentProcessed, currentFailed));
634624
int intervalProcessed = previous != null ? currentProcessed - previous.processed : 0;
635625
int intervalFailed = previous != null ? currentFailed - previous.failed : 0;
636626
return WorkerSlotsInfo.newBuilder()
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package io.temporal.worker;
2+
3+
import static org.junit.Assert.*;
4+
5+
import io.nexusrpc.handler.OperationHandler;
6+
import io.nexusrpc.handler.OperationImpl;
7+
import io.nexusrpc.handler.ServiceImpl;
8+
import io.temporal.activity.ActivityInterface;
9+
import io.temporal.activity.ActivityMethod;
10+
import io.temporal.api.enums.v1.TaskQueueType;
11+
import io.temporal.testing.TestEnvironmentOptions;
12+
import io.temporal.testing.TestWorkflowEnvironment;
13+
import io.temporal.workflow.WorkflowInterface;
14+
import io.temporal.workflow.WorkflowMethod;
15+
import io.temporal.workflow.shared.TestNexusServices;
16+
import java.util.List;
17+
import org.junit.Test;
18+
19+
public class ActiveTaskQueueTypesTest {
20+
21+
@WorkflowInterface
22+
public interface TestWorkflow {
23+
@WorkflowMethod
24+
void run();
25+
}
26+
27+
public static class TestWorkflowImpl implements TestWorkflow {
28+
@Override
29+
public void run() {}
30+
}
31+
32+
@ActivityInterface
33+
public interface TestActivity {
34+
@ActivityMethod
35+
void doThing();
36+
}
37+
38+
public static class TestActivityImpl implements TestActivity {
39+
@Override
40+
public void doThing() {}
41+
}
42+
43+
@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
44+
public static class TestNexusServiceImpl {
45+
@OperationImpl
46+
public OperationHandler<String, String> operation() {
47+
return OperationHandler.sync((ctx, details, now) -> "Hello " + now);
48+
}
49+
}
50+
51+
@Test
52+
public void testNoRegistrations() {
53+
try (TestWorkflowEnvironment env = newEnv()) {
54+
Worker worker = env.newWorker("test-queue");
55+
List<TaskQueueType> types = worker.getActiveTaskQueueTypes();
56+
assertTrue("no types should be active without registrations", types.isEmpty());
57+
}
58+
}
59+
60+
@Test
61+
public void testWorkflowOnly() {
62+
try (TestWorkflowEnvironment env = newEnv()) {
63+
Worker worker = env.newWorker("test-queue");
64+
worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class);
65+
66+
List<TaskQueueType> types = worker.getActiveTaskQueueTypes();
67+
assertEquals(1, types.size());
68+
assertTrue(types.contains(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW));
69+
}
70+
}
71+
72+
@Test
73+
public void testWorkflowAndActivity() {
74+
try (TestWorkflowEnvironment env = newEnv()) {
75+
Worker worker = env.newWorker("test-queue");
76+
worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class);
77+
worker.registerActivitiesImplementations(new TestActivityImpl());
78+
79+
List<TaskQueueType> types = worker.getActiveTaskQueueTypes();
80+
assertEquals(2, types.size());
81+
assertTrue(types.contains(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW));
82+
assertTrue(types.contains(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY));
83+
}
84+
}
85+
86+
@Test
87+
public void testAllTypes() {
88+
try (TestWorkflowEnvironment env = newEnv()) {
89+
Worker worker = env.newWorker("test-queue");
90+
worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class);
91+
worker.registerActivitiesImplementations(new TestActivityImpl());
92+
worker.registerNexusServiceImplementation(new TestNexusServiceImpl());
93+
94+
List<TaskQueueType> types = worker.getActiveTaskQueueTypes();
95+
assertEquals(3, types.size());
96+
assertTrue(types.contains(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW));
97+
assertTrue(types.contains(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY));
98+
assertTrue(types.contains(TaskQueueType.TASK_QUEUE_TYPE_NEXUS));
99+
}
100+
}
101+
102+
@Test
103+
public void testLocalActivityWorkerOnlyExcludesActivity() {
104+
try (TestWorkflowEnvironment env = newEnv()) {
105+
Worker worker =
106+
env.newWorker(
107+
"test-queue", WorkerOptions.newBuilder().setLocalActivityWorkerOnly(true).build());
108+
worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class);
109+
worker.registerActivitiesImplementations(new TestActivityImpl());
110+
111+
List<TaskQueueType> types = worker.getActiveTaskQueueTypes();
112+
assertEquals(1, types.size());
113+
assertTrue(types.contains(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW));
114+
assertFalse(types.contains(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY));
115+
}
116+
}
117+
118+
private static TestWorkflowEnvironment newEnv() {
119+
return TestWorkflowEnvironment.newInstance(TestEnvironmentOptions.newBuilder().build());
120+
}
121+
}

0 commit comments

Comments
 (0)