Skip to content

Commit b846d71

Browse files
committed
Fix merge issues
1 parent a4bd6a7 commit b846d71

8 files changed

Lines changed: 159 additions & 129 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ public final class NamespaceCapabilities {
1313
private final AtomicBoolean gracefulPollShutdown = new AtomicBoolean(false);
1414
private final AtomicBoolean workerHeartbeats = new AtomicBoolean(false);
1515

16-
1716
public void setFromCapabilities(Capabilities capabilities) {
1817
if (capabilities.getPollerAutoscaling()) {
1918
pollerAutoscaling.set(true);
@@ -34,7 +33,7 @@ public boolean isGracefulPollShutdown() {
3433
public void setGracefulPollShutdown(boolean value) {
3534
gracefulPollShutdown.set(value);
3635
}
37-
36+
3837
public boolean isWorkerHeartbeats() {
3938
return workerHeartbeats.get();
4039
}

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue;
44

55
import io.temporal.api.common.v1.Payloads;
6-
import io.temporal.api.enums.v1.TaskQueueType;
76
import io.temporal.api.taskqueue.v1.TaskQueue;
8-
import io.temporal.api.worker.v1.WorkerHeartbeat;
97
import io.temporal.client.WorkflowClient;
108
import io.temporal.common.converter.DataConverter;
119
import io.temporal.common.converter.EncodedValues;
@@ -24,11 +22,9 @@
2422
import io.temporal.workflow.Functions.Func1;
2523
import java.lang.reflect.Type;
2624
import java.time.Duration;
27-
import java.util.List;
2825
import java.util.Objects;
2926
import java.util.Optional;
3027
import java.util.concurrent.*;
31-
import java.util.function.Supplier;
3228
import javax.annotation.Nonnull;
3329
import javax.annotation.Nullable;
3430
import org.slf4j.Logger;
@@ -64,8 +60,6 @@ public SyncWorkflowWorker(
6460
@Nonnull WorkflowClient client,
6561
@Nonnull String namespace,
6662
@Nonnull String taskQueue,
67-
@Nonnull String workerInstanceKey,
68-
@Nonnull Supplier<List<TaskQueueType>> activeTaskQueueTypesSupplier,
6963
@Nonnull SingleWorkerOptions singleWorkerOptions,
7064
@Nonnull SingleWorkerOptions localActivityOptions,
7165
@Nonnull WorkflowRunLockManager runLocks,
@@ -123,8 +117,6 @@ public SyncWorkflowWorker(
123117
client.getWorkflowServiceStubs(),
124118
namespace,
125119
taskQueue,
126-
workerInstanceKey,
127-
activeTaskQueueTypesSupplier,
128120
stickyTaskQueueName,
129121
singleWorkerOptions,
130122
runLocks,
@@ -250,10 +242,6 @@ public TrackingSlotSupplier<LocalActivitySlotInfo> getLocalActivitySlotSupplier(
250242
return laWorker.getSlotSupplier();
251243
}
252244

253-
public void setHeartbeatSupplier(Supplier<WorkerHeartbeat> supplier) {
254-
workflowWorker.setHeartbeatSupplier(supplier);
255-
}
256-
257245
public boolean hasStickyQueue() {
258246
return workflowWorker.hasStickyQueue();
259247
}

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,8 @@
1313
import io.temporal.api.common.v1.WorkflowExecution;
1414
import io.temporal.api.enums.v1.QueryResultType;
1515
import io.temporal.api.enums.v1.TaskQueueKind;
16-
import io.temporal.api.enums.v1.TaskQueueType;
17-
import io.temporal.api.enums.v1.WorkerStatus;
1816
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
1917
import io.temporal.api.failure.v1.Failure;
20-
import io.temporal.api.worker.v1.WorkerHeartbeat;
2118
import io.temporal.api.workflowservice.v1.*;
2219
import io.temporal.failure.ApplicationFailure;
2320
import io.temporal.internal.logging.LoggerTag;
@@ -33,7 +30,6 @@
3330
import java.util.concurrent.CompletableFuture;
3431
import java.util.concurrent.RejectedExecutionException;
3532
import java.util.concurrent.TimeUnit;
36-
import java.util.function.Supplier;
3733
import javax.annotation.Nonnull;
3834
import javax.annotation.Nullable;
3935
import org.slf4j.Logger;
@@ -57,9 +53,6 @@ final class WorkflowWorker implements SuspendableWorker {
5753
private final GrpcRetryer grpcRetryer;
5854
private final EagerActivityDispatcher eagerActivityDispatcher;
5955
private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
60-
private volatile Supplier<WorkerHeartbeat> heartbeatSupplier;
61-
private final String workerInstanceKey;
62-
private final Supplier<List<TaskQueueType>> activeTaskQueueTypesSupplier;
6356

6457
private final TaskCounter taskCounter = new TaskCounter();
6558
private final PollerTracker pollerTracker = new PollerTracker();
@@ -78,8 +71,6 @@ public WorkflowWorker(
7871
@Nonnull WorkflowServiceStubs service,
7972
@Nonnull String namespace,
8073
@Nonnull String taskQueue,
81-
@Nonnull String workerInstanceKey,
82-
@Nonnull Supplier<List<TaskQueueType>> activeTaskQueueTypesSupplier,
8374
@Nullable String stickyTaskQueueName,
8475
@Nonnull SingleWorkerOptions options,
8576
@Nonnull WorkflowRunLockManager runLocks,
@@ -91,8 +82,6 @@ public WorkflowWorker(
9182
this.service = Objects.requireNonNull(service);
9283
this.namespace = Objects.requireNonNull(namespace);
9384
this.taskQueue = Objects.requireNonNull(taskQueue);
94-
this.workerInstanceKey = Objects.requireNonNull(workerInstanceKey);
95-
this.activeTaskQueueTypesSupplier = Objects.requireNonNull(activeTaskQueueTypesSupplier);
9685
this.options = Objects.requireNonNull(options);
9786
this.stickyTaskQueueName = stickyTaskQueueName;
9887
this.pollerOptions = getPollerOptions(options);
@@ -344,10 +333,6 @@ public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
344333
.orElse(null);
345334
}
346335

347-
public void setHeartbeatSupplier(Supplier<WorkerHeartbeat> supplier) {
348-
this.heartbeatSupplier = supplier;
349-
}
350-
351336
public TrackingSlotSupplier<WorkflowSlotInfo> getSlotSupplier() {
352337
return slotSupplier;
353338
}

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import io.temporal.internal.worker.*;
2828
import io.temporal.internal.worker.TaskCounter;
2929
import io.temporal.serviceclient.MetricsTag;
30-
import io.temporal.serviceclient.WorkflowServiceStubs;
3130
import io.temporal.serviceclient.Version;
31+
import io.temporal.serviceclient.WorkflowServiceStubs;
3232
import io.temporal.worker.tuning.*;
3333
import io.temporal.workflow.Functions;
3434
import io.temporal.workflow.Functions.Func;
@@ -67,17 +67,16 @@ public final class Worker {
6767
private final String namespace;
6868
private final String identity;
6969
private final String stickyTaskQueueName;
70-
private final NamespaceCapabilities namespaceCapabilities;
7170
final SyncWorkflowWorker workflowWorker;
7271
final SyncActivityWorker activityWorker;
7372
final SyncNexusWorker nexusWorker;
7473
private final AtomicBoolean started = new AtomicBoolean();
7574
private volatile boolean shuttingDown = false;
76-
private final String workerInstanceKey = UUID.randomUUID().toString();
7775
private volatile Instant startTime;
7876
private final WorkflowClientOptions clientOptions;
7977
private final @Nonnull WorkflowExecutorCache cache;
8078
private final Map<String, TaskSnapshot> previousHeartbeatSnapshots = new ConcurrentHashMap<>();
79+
private volatile Supplier<WorkerHeartbeat> heartbeatSupplier;
8180

8281
private static final class TaskSnapshot {
8382
final int processed;
@@ -114,8 +113,7 @@ private static final class TaskSnapshot {
114113
@Nonnull NamespaceCapabilities namespaceCapabilities) {
115114

116115
Objects.requireNonNull(client, "client should not be null");
117-
this.namespaceCapabilities =
118-
Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null");
116+
Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null");
119117
this.plugins = Objects.requireNonNull(plugins, "plugins should not be null");
120118
Preconditions.checkArgument(
121119
!Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string");
@@ -220,8 +218,6 @@ private static final class TaskSnapshot {
220218
client,
221219
namespace,
222220
taskQueue,
223-
workerInstanceKey,
224-
this::getActiveTaskQueueTypes,
225221
singleWorkerOptions,
226222
localActivityOptions,
227223
runLocks,
@@ -493,14 +489,16 @@ CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interr
493489
.setWorkerInstanceKey(workerInstanceKey)
494490
.setTaskQueue(taskQueue)
495491
.setReason("graceful shutdown")
496-
.addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW)
497-
.addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_NEXUS);
498-
if (activityWorker != null) {
499-
requestBuilder.addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY);
500-
}
492+
.addAllTaskQueueTypes(getActiveTaskQueueTypes());
501493
if (stickyTaskQueueName != null) {
502494
requestBuilder.setStickyTaskQueue(stickyTaskQueueName);
503495
}
496+
if (heartbeatSupplier != null) {
497+
requestBuilder.setWorkerHeartbeat(
498+
heartbeatSupplier.get().toBuilder()
499+
.setStatus(WorkerStatus.WORKER_STATUS_SHUTTING_DOWN)
500+
.build());
501+
}
504502
CompletableFuture<Void> shutdownWorkerRpc =
505503
shutdownManager.waitOnWorkerShutdownRequest(
506504
service.futureStub().shutdownWorker(requestBuilder.build()));
@@ -550,6 +548,10 @@ String getWorkerInstanceKey() {
550548
return workerInstanceKey;
551549
}
552550

551+
void setHeartbeatSupplier(Supplier<WorkerHeartbeat> supplier) {
552+
this.heartbeatSupplier = supplier;
553+
}
554+
553555
List<TaskQueueType> getActiveTaskQueueTypes() {
554556
List<TaskQueueType> types = new ArrayList<>();
555557
if (workflowWorker.isAnyTypeSupported()) {

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ private void doStart() {
312312
Supplier<WorkerHeartbeat> heartbeatSupplier =
313313
worker.buildHeartbeatCallback(workerGroupingKey);
314314
hbManager.registerWorker(namespace, worker.getWorkerInstanceKey(), heartbeatSupplier);
315-
worker.workflowWorker.setHeartbeatSupplier(heartbeatSupplier);
315+
worker.setHeartbeatSupplier(heartbeatSupplier);
316316
}
317317
}
318318

temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import java.util.concurrent.CountDownLatch;
1010
import java.util.concurrent.TimeUnit;
1111
import java.util.concurrent.atomic.AtomicReference;
12-
import org.checkerframework.checker.nullness.qual.NonNull;
12+
import javax.annotation.Nonnull;
1313
import org.junit.Test;
1414
import org.junit.runner.RunWith;
1515
import org.junit.runners.Parameterized;
@@ -39,7 +39,7 @@ public void inflightPollSurvivesShutdownOnlyWhenGraceful() throws Exception {
3939
ShutdownableTaskExecutor<String> taskExecutor =
4040
new ShutdownableTaskExecutor<String>() {
4141
@Override
42-
public void process(@NonNull String task) {
42+
public void process(@Nonnull String task) {
4343
processedTask.set(task);
4444
taskProcessedLatch.countDown();
4545
}

temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import com.uber.m3.util.ImmutableMap;
1515
import io.temporal.api.common.v1.WorkflowExecution;
1616
import io.temporal.api.common.v1.WorkflowType;
17-
import io.temporal.api.enums.v1.TaskQueueType;
1817
import io.temporal.api.workflowservice.v1.*;
1918
import io.temporal.common.reporter.TestStatsReporter;
2019
import io.temporal.internal.common.InternalUtils;
@@ -30,12 +29,8 @@
3029
import io.temporal.worker.tuning.SlotSupplier;
3130
import io.temporal.worker.tuning.WorkflowSlotInfo;
3231
import java.time.Duration;
33-
import java.util.Arrays;
34-
import java.util.List;
3532
import java.util.UUID;
3633
import java.util.concurrent.*;
37-
import java.util.concurrent.atomic.AtomicReference;
38-
import java.util.function.Supplier;
3934
import org.junit.Test;
4035
import org.mockito.stubbing.Answer;
4136
import org.slf4j.Logger;
@@ -74,8 +69,6 @@ public void concurrentPollRequestLockTest() throws Exception {
7469
client,
7570
"default",
7671
"task_queue",
77-
"test-worker-instance-key",
78-
java.util.Collections::emptyList,
7972
"sticky_task_queue",
8073
SingleWorkerOptions.newBuilder()
8174
.setIdentity("test_identity")
@@ -247,8 +240,6 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception {
247240
client,
248241
"default",
249242
"task_queue",
250-
"test-worker-instance-key",
251-
java.util.Collections::emptyList,
252243
"sticky_task_queue",
253244
SingleWorkerOptions.newBuilder()
254245
.setIdentity("test_identity")
@@ -393,8 +384,6 @@ public boolean isAnyTypeSupported() {
393384
client,
394385
"default",
395386
"taskQueue",
396-
"test-worker-instance-key",
397-
java.util.Collections::emptyList,
398387
"sticky",
399388
SingleWorkerOptions.newBuilder()
400389
.setIdentity("test_identity")
@@ -447,80 +436,6 @@ public boolean isAnyTypeSupported() {
447436
worker.shutdown(new ShutdownManager(), true).get();
448437
}
449438

450-
@Test
451-
public void activeTaskQueueTypesEvaluatedAtShutdownTime() throws Exception {
452-
WorkflowServiceStubs client = mock(WorkflowServiceStubs.class);
453-
when(client.getServerCapabilities())
454-
.thenReturn(() -> GetSystemInfoResponse.Capabilities.newBuilder().build());
455-
456-
WorkflowRunLockManager runLockManager = new WorkflowRunLockManager();
457-
Scope metricsScope = new NoopScope();
458-
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, runLockManager, metricsScope);
459-
SlotSupplier<WorkflowSlotInfo> slotSupplier = new FixedSizeSlotSupplier<>(10);
460-
461-
WorkflowTaskHandler taskHandler = mock(WorkflowTaskHandler.class);
462-
when(taskHandler.isAnyTypeSupported()).thenReturn(true);
463-
464-
// Supplier that starts with WORKFLOW only, then adds NEXUS later
465-
AtomicReference<List<TaskQueueType>> typesRef =
466-
new AtomicReference<>(Arrays.asList(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW));
467-
Supplier<List<TaskQueueType>> supplier = typesRef::get;
468-
469-
EagerActivityDispatcher eagerActivityDispatcher = mock(EagerActivityDispatcher.class);
470-
WorkflowWorker worker =
471-
new WorkflowWorker(
472-
client,
473-
"default",
474-
"task_queue",
475-
"test-worker-instance-key",
476-
supplier,
477-
null,
478-
SingleWorkerOptions.newBuilder()
479-
.setIdentity("test_identity")
480-
.setBuildId(UUID.randomUUID().toString())
481-
.setPollerOptions(
482-
PollerOptions.newBuilder()
483-
.setPollerBehavior(new PollerBehaviorSimpleMaximum(1))
484-
.build())
485-
.setMetricsScope(metricsScope)
486-
.build(),
487-
runLockManager,
488-
cache,
489-
taskHandler,
490-
eagerActivityDispatcher,
491-
slotSupplier,
492-
new NamespaceCapabilities());
493-
494-
// Simulate registering Nexus after construction
495-
typesRef.set(
496-
Arrays.asList(
497-
TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW,
498-
TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY,
499-
TaskQueueType.TASK_QUEUE_TYPE_NEXUS));
500-
501-
WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub =
502-
mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class);
503-
when(client.futureStub()).thenReturn(futureStub);
504-
when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class)))
505-
.thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build()));
506-
507-
worker.shutdown(new ShutdownManager(), true).get(5, TimeUnit.SECONDS);
508-
509-
org.mockito.ArgumentCaptor<ShutdownWorkerRequest> captor =
510-
org.mockito.ArgumentCaptor.forClass(ShutdownWorkerRequest.class);
511-
verify(futureStub).shutdownWorker(captor.capture());
512-
List<TaskQueueType> shutdownTypes = captor.getValue().getTaskQueueTypesList();
513-
assertTrue(
514-
"ShutdownWorkerRequest should include NEXUS type added after construction",
515-
shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_NEXUS));
516-
assertTrue(
517-
"ShutdownWorkerRequest should include WORKFLOW type",
518-
shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW));
519-
assertTrue(
520-
"ShutdownWorkerRequest should include ACTIVITY type",
521-
shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY));
522-
}
523-
524439
private ReplayWorkflowFactory setUpMockWorkflowFactory() throws Throwable {
525440
ReplayWorkflow mockWorkflow = mock(ReplayWorkflow.class);
526441
ReplayWorkflowFactory mockFactory = mock(ReplayWorkflowFactory.class);

0 commit comments

Comments
 (0)