Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
--dynamic-config-value history.MaxBufferedQueryCount=10000 \
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
--dynamic-config-value history.enableRequestIdRefLinks=true \
--dynamic-config-value frontend.ListWorkersEnabled=true \
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' &
sleep 10s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
import io.temporal.internal.common.PluginUtils;
import io.temporal.internal.sync.StubMarker;
import io.temporal.internal.worker.HeartbeatManager;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsPlugin;
Expand Down Expand Up @@ -53,6 +54,8 @@ final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClient
private final Scope metricsScope;
private final WorkflowClientInterceptor[] interceptors;
private final WorkerFactoryRegistry workerFactoryRegistry = new WorkerFactoryRegistry();
private final String workerGroupingKey = java.util.UUID.randomUUID().toString();
private final @Nullable HeartbeatManager heartbeatManager;

/**
* Creates client that connects to an instance of the Temporal Service. Cannot be used from within
Expand Down Expand Up @@ -112,6 +115,14 @@ public static WorkflowClient newInstance(
options.getNamespace(),
options.getIdentity(),
options.getDataConverter());

java.time.Duration heartbeatInterval = options.getWorkerHeartbeatInterval();
if (!heartbeatInterval.isNegative()) {
this.heartbeatManager =
new HeartbeatManager(workflowServiceStubs, options.getIdentity(), heartbeatInterval);
} else {
this.heartbeatManager = null;
}
Comment thread
cursor[bot] marked this conversation as resolved.
}

private WorkflowClientCallsInterceptor initializeClientInvoker() {
Expand Down Expand Up @@ -790,6 +801,17 @@ public void deregisterWorkerFactory(WorkerFactory workerFactory) {
workerFactoryRegistry.deregister(workerFactory);
}

@Override
public String getWorkerGroupingKey() {
return workerGroupingKey;
}

@Override
@Nullable
public HeartbeatManager getHeartbeatManager() {
return heartbeatManager;
}

@Override
public NexusStartWorkflowResponse startNexus(
NexusStartWorkflowRequest request, Functions.Proc workflow) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.temporal.client;

import com.google.common.base.Preconditions;
import io.temporal.api.enums.v1.QueryRejectCondition;
import io.temporal.common.Experimental;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.common.interceptors.WorkflowClientInterceptor;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -49,6 +51,7 @@ public static final class Builder {
private List<ContextPropagator> contextPropagators;
private QueryRejectCondition queryRejectCondition;
private WorkflowClientPlugin[] plugins;
private Duration workerHeartbeatInterval;

private Builder() {}

Expand All @@ -64,6 +67,7 @@ private Builder(WorkflowClientOptions options) {
contextPropagators = options.contextPropagators;
queryRejectCondition = options.queryRejectCondition;
plugins = options.plugins;
workerHeartbeatInterval = options.workerHeartbeatInterval;
}

public Builder setNamespace(String namespace) {
Expand Down Expand Up @@ -153,6 +157,19 @@ public Builder setPlugins(WorkflowClientPlugin... plugins) {
return this;
}

/**
* Sets the interval at which workers send heartbeat RPCs to the server. If not set or set to
* zero, defaults to 60 seconds. A negative duration disables heartbeating. Positive values must
* be between 1 and 60 seconds inclusive.
*
* @param workerHeartbeatInterval the heartbeat interval, or a negative duration to disable
*/
@Experimental
public Builder setWorkerHeartbeatInterval(Duration workerHeartbeatInterval) {
this.workerHeartbeatInterval = workerHeartbeatInterval;
return this;
}

public WorkflowClientOptions build() {
return new WorkflowClientOptions(
namespace,
Expand All @@ -162,7 +179,8 @@ public WorkflowClientOptions build() {
binaryChecksum,
contextPropagators,
queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
plugins == null ? EMPTY_PLUGINS : plugins,
resolveHeartbeatInterval(workerHeartbeatInterval));
}

/**
Expand All @@ -188,7 +206,22 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
queryRejectCondition == null
? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED
: queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
plugins == null ? EMPTY_PLUGINS : plugins,
resolveHeartbeatInterval(workerHeartbeatInterval));
}

private static Duration resolveHeartbeatInterval(Duration raw) {
if (raw == null || raw.isZero()) {
return Duration.ofSeconds(60);
}
if (raw.isNegative()) {
return raw;
}
Preconditions.checkArgument(
raw.compareTo(Duration.ofSeconds(1)) >= 0 && raw.compareTo(Duration.ofSeconds(60)) <= 0,
"workerHeartbeatInterval must be between 1s and 60s, got %s",
raw);
return raw;
}
}

Expand All @@ -215,6 +248,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {

private final WorkflowClientPlugin[] plugins;

private final Duration workerHeartbeatInterval;

private WorkflowClientOptions(
String namespace,
DataConverter dataConverter,
Expand All @@ -223,7 +258,8 @@ private WorkflowClientOptions(
String binaryChecksum,
List<ContextPropagator> contextPropagators,
QueryRejectCondition queryRejectCondition,
WorkflowClientPlugin[] plugins) {
WorkflowClientPlugin[] plugins,
Duration workerHeartbeatInterval) {
this.namespace = namespace;
this.dataConverter = dataConverter;
this.interceptors = interceptors;
Expand All @@ -232,6 +268,7 @@ private WorkflowClientOptions(
this.contextPropagators = contextPropagators;
this.queryRejectCondition = queryRejectCondition;
this.plugins = plugins;
this.workerHeartbeatInterval = workerHeartbeatInterval;
}

/**
Expand Down Expand Up @@ -289,6 +326,15 @@ public WorkflowClientPlugin[] getPlugins() {
return plugins;
}

/**
* Returns the worker heartbeat interval. Defaults to 60 seconds if not configured. A negative
* duration means heartbeating is explicitly disabled.
*/
@Experimental
public Duration getWorkerHeartbeatInterval() {
return workerHeartbeatInterval;
}

@Override
public String toString() {
return "WorkflowClientOptions{"
Expand All @@ -311,6 +357,8 @@ public String toString() {
+ queryRejectCondition
+ ", plugins="
+ Arrays.toString(plugins)
+ ", workerHeartbeatInterval="
+ workerHeartbeatInterval
+ '}';
}

Expand All @@ -326,7 +374,9 @@ public boolean equals(Object o) {
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
&& com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators)
&& queryRejectCondition == that.queryRejectCondition
&& Arrays.equals(plugins, that.plugins);
&& Arrays.equals(plugins, that.plugins)
&& com.google.common.base.Objects.equal(
workerHeartbeatInterval, that.workerHeartbeatInterval);
}

@Override
Expand All @@ -339,6 +389,7 @@ public int hashCode() {
binaryChecksum,
contextPropagators,
queryRejectCondition,
Arrays.hashCode(plugins));
Arrays.hashCode(plugins),
workerHeartbeatInterval);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.temporal.internal.client;

import io.temporal.client.WorkflowClient;
import io.temporal.internal.worker.HeartbeatManager;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Functions;
import javax.annotation.Nullable;

/**
* From OOP point of view, there is no reason for this interface not to extend {@link
Expand All @@ -18,4 +20,9 @@ public interface WorkflowClientInternal {
void deregisterWorkerFactory(WorkerFactory workerFactory);

NexusStartWorkflowResponse startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow);

String getWorkerGroupingKey();

@Nullable
HeartbeatManager getHeartbeatManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.temporal.worker.PollerTypeMetricsTag;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
Expand All @@ -30,7 +29,7 @@ final class ActivityPollTask implements MultiThreadedPoller.PollTask<ActivityTas
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
private final Scope metricsScope;
private final PollActivityTaskQueueRequest pollRequest;
private final AtomicInteger pollGauge = new AtomicInteger();
private final PollerTracker pollerTracker;

@SuppressWarnings("deprecation")
public ActivityPollTask(
Expand All @@ -42,10 +41,12 @@ public ActivityPollTask(
double activitiesPerSecond,
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
@Nonnull Scope metricsScope,
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities,
@Nonnull PollerTracker pollerTracker) {
this.service = Objects.requireNonNull(service);
this.slotSupplier = slotSupplier;
this.metricsScope = Objects.requireNonNull(metricsScope);
this.pollerTracker = Objects.requireNonNull(pollerTracker);

PollActivityTaskQueueRequest.Builder pollRequest =
PollActivityTaskQueueRequest.newBuilder()
Expand Down Expand Up @@ -100,7 +101,7 @@ public ActivityTask poll() {

MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
.gauge(MetricsType.NUM_POLLERS)
.update(pollGauge.incrementAndGet());
.update(pollerTracker.pollStarted());

try {
response =
Expand All @@ -119,14 +120,15 @@ public ActivityTask poll() {
ProtobufTimeUtils.toM3Duration(
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
isSuccessful = true;
pollerTracker.pollSucceeded();
return new ActivityTask(
response,
permit,
() -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
} finally {
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
.gauge(MetricsType.NUM_POLLERS)
.update(pollGauge.decrementAndGet());
.update(pollerTracker.pollCompleted());

if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
}
Expand Down
Loading
Loading