diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 048d39096..14938fa3d 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -410,8 +410,9 @@ mod config { ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions, SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, api::worker::{ - ActivitySlotKind, LocalActivitySlotKind, PollerBehavior, SlotKind, WorkerConfig, - WorkerConfigBuilder, WorkerConfigBuilderError, WorkflowSlotKind, + ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior, + SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError, + WorkflowSlotKind, }, }; @@ -428,8 +429,8 @@ mod config { namespace: String, tuner: WorkerTuner, non_sticky_to_sticky_poll_ratio: f32, - max_concurrent_workflow_task_polls: usize, - max_concurrent_activity_task_polls: usize, + workflow_task_poller_behavior: PollerBehavior, + activity_task_poller_behavior: PollerBehavior, enable_non_local_activities: bool, sticky_queue_schedule_to_start_timeout: Duration, max_cached_workflows: usize, @@ -440,6 +441,18 @@ mod config { shutdown_grace_time: Option, } + #[derive(TryFromJs)] + pub enum PollerBehavior { + SimpleMaximum { + maximum: usize, + }, + Autoscaling { + minimum: usize, + maximum: usize, + initial: usize, + }, + } + impl BridgeWorkerOptions { pub(crate) fn into_core_config(self) -> Result { // Set all other options @@ -452,12 +465,8 @@ mod config { .namespace(self.namespace) .tuner(self.tuner.into_core_config()?) .nonsticky_to_sticky_poll_ratio(self.non_sticky_to_sticky_poll_ratio) - .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum( - self.max_concurrent_workflow_task_polls, - )) - .activity_task_poller_behavior(PollerBehavior::SimpleMaximum( - self.max_concurrent_activity_task_polls, - )) + .workflow_task_poller_behavior(self.workflow_task_poller_behavior) + .activity_task_poller_behavior(self.activity_task_poller_behavior) .no_remote_activities(!self.enable_non_local_activities) .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout) .max_cached_workflows(self.max_cached_workflows) @@ -470,6 +479,25 @@ mod config { } } + impl From for CorePollerBehavior { + fn from(val: PollerBehavior) -> Self { + match val { + PollerBehavior::SimpleMaximum { maximum } => { + CorePollerBehavior::SimpleMaximum(maximum) + } + PollerBehavior::Autoscaling { + minimum, + maximum, + initial, + } => CorePollerBehavior::Autoscaling { + minimum, + maximum, + initial, + }, + } + } + } + #[derive(TryFromJs)] #[allow(clippy::struct_field_names)] pub(super) struct WorkerTuner { diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index eda9bd3b3..539a8ce4d 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -179,8 +179,8 @@ export interface WorkerOptions { namespace: string; tuner: WorkerTunerOptions; nonStickyToStickyPollRatio: number; - maxConcurrentWorkflowTaskPolls: number; - maxConcurrentActivityTaskPolls: number; + workflowTaskPollerBehavior: PollerBehavior; + activityTaskPollerBehavior: PollerBehavior; enableNonLocalActivities: boolean; stickyQueueScheduleToStartTimeout: number; maxCachedWorkflows: number; @@ -191,6 +191,18 @@ export interface WorkerOptions { shutdownGraceTime: number; } +export type PollerBehavior = + | { + type: 'simple-maximum'; + maximum: number; + } + | { + type: 'autoscaling'; + minimum: number; + maximum: number; + initial: number; + }; + //////////////////////////////////////////////////////////////////////////////////////////////////// // Worker Tuner //////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/packages/test/src/test-bridge.ts b/packages/test/src/test-bridge.ts index 957d6bd44..6458f6d7c 100644 --- a/packages/test/src/test-bridge.ts +++ b/packages/test/src/test-bridge.ts @@ -277,8 +277,16 @@ const GenericConfigs = { }, }, nonStickyToStickyPollRatio: 0.5, - maxConcurrentWorkflowTaskPolls: 1, - maxConcurrentActivityTaskPolls: 1, + workflowTaskPollerBehavior: { + type: 'simple-maximum', + maximum: 1, + }, + activityTaskPollerBehavior: { + type: 'autoscaling', + minimum: 1, + initial: 5, + maximum: 100, + }, enableNonLocalActivities: false, stickyQueueScheduleToStartTimeout: 1000, maxCachedWorkflows: 1000, diff --git a/packages/test/src/test-worker-poller-autoscale.ts b/packages/test/src/test-worker-poller-autoscale.ts new file mode 100644 index 000000000..edd96c8b9 --- /dev/null +++ b/packages/test/src/test-worker-poller-autoscale.ts @@ -0,0 +1,83 @@ +import { v4 as uuid } from 'uuid'; +import fetch from 'node-fetch'; +import test from 'ava'; +import { Runtime, Worker } from '@temporalio/worker'; +import { getRandomPort, TestWorkflowEnvironment } from './helpers'; +import * as activities from './activities'; +import * as workflows from './workflows'; + +test.serial('Can run autoscaling polling worker', async (t) => { + const port = await getRandomPort(); + Runtime.install({ + telemetryOptions: { + metrics: { + prometheus: { + bindAddress: `127.0.0.1:${port}`, + }, + }, + }, + }); + const localEnv = await TestWorkflowEnvironment.createLocal(); + + try { + const taskQueue = `autoscale-pollers-${uuid()}`; + const worker = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + connection: localEnv.nativeConnection, + taskQueue, + workflowTaskPollerBehavior: { + type: 'autoscaling', + initial: 2, + }, + activityTaskPollerBehavior: { + type: 'autoscaling', + initial: 2, + }, + }); + const workerPromise = worker.run(); + + // Give pollers a beat to start + await new Promise((resolve) => setTimeout(resolve, 300)); + + const resp = await fetch(`http://127.0.0.1:${port}/metrics`); + const metricsText = await resp.text(); + const metricsLines = metricsText.split('\n'); + + const matches = metricsLines.filter((l) => l.includes('temporal_num_pollers')); + const activity_pollers = matches.filter((l) => l.includes('activity_task')); + t.is(activity_pollers.length, 1, 'Should have exactly one activity poller metric'); + t.true(activity_pollers[0].endsWith('2'), 'Activity poller count should be 2'); + const workflow_pollers = matches.filter((l) => l.includes('workflow_task')); + t.is(workflow_pollers.length, 2, 'Should have exactly two workflow poller metrics (sticky and non-sticky)'); + + // There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on + // initialization timing. + t.true( + workflow_pollers[0].endsWith('2') || workflow_pollers[0].endsWith('1'), + 'First workflow poller count should be 1 or 2' + ); + t.true( + workflow_pollers[1].endsWith('2') || workflow_pollers[1].endsWith('1'), + 'Second workflow poller count should be 1 or 2' + ); + + const workflowPromises = Array(20) + .fill(0) + .map(async (_) => { + const handle = await localEnv.client.workflow.start(workflows.waitOnSignalThenActivity, { + taskQueue, + workflowId: `resource-based-${uuid()}`, + }); + await handle.signal('my-signal', 'finish'); + return handle.result(); + }); + + await Promise.all(workflowPromises); + worker.shutdown(); + await workerPromise; + t.pass(); + } finally { + await localEnv.teardown(); + } +}); diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index a77256420..d4959ec2f 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -91,3 +91,4 @@ export * from './wait-on-user'; export * from './workflow-cancellation-scenarios'; export * from './upsert-and-read-memo'; export * from './updates-ordering'; +export * from './wait-on-signal-then-activity'; diff --git a/packages/test/src/workflows/wait-on-signal-then-activity.ts b/packages/test/src/workflows/wait-on-signal-then-activity.ts new file mode 100644 index 000000000..b34ee129a --- /dev/null +++ b/packages/test/src/workflows/wait-on-signal-then-activity.ts @@ -0,0 +1,15 @@ +import * as wf from '@temporalio/workflow'; +import type * as activities from '../activities'; + +const { echo } = wf.proxyActivities({ startToCloseTimeout: '5s' }); +export const mySignal = wf.defineSignal<[string]>('my-signal'); +export async function waitOnSignalThenActivity(): Promise { + let lastSignal = ''; + + wf.setHandler(mySignal, (value: string) => { + lastSignal = value; + }); + + await wf.condition(() => lastSignal === 'finish'); + await echo('hi'); +} diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index ce9946c93..44c8e1941 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -282,6 +282,20 @@ export interface WorkerOptions { */ maxConcurrentWorkflowTaskPolls?: number; + /** + * Specify the behavior of workflow task polling. + * + * @default A fixed maximum whose value is min(10, maxConcurrentWorkflowTaskExecutions). + */ + workflowTaskPollerBehavior?: PollerBehavior; + + /** + * Specify the behavior of activity task polling. + * + * @default A fixed maximum whose value is min(10, maxConcurrentActivityTaskExecutions). + */ + activityTaskPollerBehavior?: PollerBehavior; + /** * Maximum number of Activity tasks to poll concurrently. * @@ -499,6 +513,48 @@ export interface WorkerOptions { }; } +export type PollerBehavior = PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling; + +/** + * A poller behavior that will automatically scale the number of pollers based on feedback + * from the server. A slot must be available before beginning polling. + * + * @experimental Poller autoscaling is currently experimental and may change in future versions. + */ +export interface PollerBehaviorAutoscaling { + type: 'autoscaling'; + /** + * At least this many poll calls will always be attempted (assuming slots are available). + * Cannot be lower than 1. Defaults to 1. + */ + minimum?: number; + /** + * At most this many poll calls will ever be open at once. Must be >= `minimum`. + * Defaults to 100. + */ + maximum?: number; + /** + * This many polls will be attempted initially before scaling kicks in. Must be between + * `minimum` and `maximum`. + * Defaults to 5. + */ + initial?: number; +} + +/** + * A poller behavior that will attempt to poll as long as a slot is available, up to the + * provided maximum. + */ +export interface PollerBehaviorSimpleMaximum { + type: 'simple-maximum'; + /** + * The maximum poller number, assumes the same default as described in + * {@link WorkerOptions.maxConcurrentWorkflowTaskPolls} or + * {@link WorkerOptions.maxConcurrentActivityTaskPolls}. + */ + maximum?: number; +} + // Replay Worker /////////////////////////////////////////////////////////////////////////////////// /** @@ -517,6 +573,8 @@ export interface ReplayWorkerOptions | 'maxConcurrentWorkflowTaskExecutions' | 'maxConcurrentActivityTaskPolls' | 'maxConcurrentWorkflowTaskPolls' + | 'workflowTaskPollerBehavior' + | 'activityTaskPollerBehavior' | 'nonStickyToStickyPollRatio' | 'maxHeartbeatThrottleInterval' | 'defaultHeartbeatThrottleInterval' @@ -652,8 +710,6 @@ export type WorkerOptionsWithDefaults = WorkerOptions & | 'identity' | 'useVersioning' | 'shutdownGraceTime' - | 'maxConcurrentWorkflowTaskPolls' - | 'maxConcurrentActivityTaskPolls' | 'nonStickyToStickyPollRatio' | 'enableNonLocalActivities' | 'stickyQueueScheduleToStartTimeout' @@ -678,6 +734,9 @@ export type WorkerOptionsWithDefaults = WorkerOptions & * @default 5s */ isolateExecutionTimeout: Duration; + + workflowTaskPollerBehavior: Required; + activityTaskPollerBehavior: Required; }; /** @@ -715,6 +774,8 @@ function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): Worker maxConcurrentActivityTaskExecutions, maxConcurrentLocalActivityExecutions, maxConcurrentWorkflowTaskExecutions, + workflowTaskPollerBehavior, + activityTaskPollerBehavior, ...rest } = options; const debugMode = options.debugMode || isSet(process.env.TEMPORAL_DEBUG); @@ -767,6 +828,21 @@ function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): Worker }; } + const createPollerBehavior = (defaultMax: number, behavior?: PollerBehavior): Required => + !behavior + ? { type: 'simple-maximum', maximum: defaultMax } + : behavior.type === 'simple-maximum' + ? { type: 'simple-maximum', maximum: behavior.maximum ?? defaultMax } + : { + type: 'autoscaling', + minimum: behavior.minimum ?? 1, + initial: behavior.initial ?? 5, + maximum: behavior.maximum ?? 100, + }; + + const wftPollerBehavior = createPollerBehavior(maxWFTPolls, workflowTaskPollerBehavior); + const atPollerBehavior = createPollerBehavior(maxATPolls, activityTaskPollerBehavior); + return { namespace: namespace ?? 'default', identity: `${process.pid}@${os.hostname()}`, @@ -774,8 +850,8 @@ function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): Worker buildId, shutdownGraceTime: 0, enableNonLocalActivities: true, - maxConcurrentWorkflowTaskPolls: maxWFTPolls, - maxConcurrentActivityTaskPolls: maxATPolls, + workflowTaskPollerBehavior: wftPollerBehavior, + activityTaskPollerBehavior: atPollerBehavior, stickyQueueScheduleToStartTimeout: '10s', maxHeartbeatThrottleInterval: '60s', defaultHeartbeatThrottleInterval: '30s', @@ -854,8 +930,8 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n namespace: opts.namespace, tuner: opts.tuner, nonStickyToStickyPollRatio: opts.nonStickyToStickyPollRatio, - maxConcurrentWorkflowTaskPolls: opts.maxConcurrentWorkflowTaskPolls, - maxConcurrentActivityTaskPolls: opts.maxConcurrentActivityTaskPolls, + workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior), + activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior), enableNonLocalActivities: opts.enableNonLocalActivities, stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout), maxCachedWorkflows: opts.maxCachedWorkflows, @@ -867,6 +943,25 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n }; } +export function toNativeTaskPollerBehavior(behavior: Required): native.PollerBehavior { + switch (behavior.type) { + case 'simple-maximum': + return { + type: 'simple-maximum', + maximum: behavior.maximum, + }; + case 'autoscaling': + return { + type: 'autoscaling', + minimum: behavior.minimum, + initial: behavior.initial, + maximum: behavior.maximum, + }; + default: + throw new Error(`Unknown poller behavior type: ${(behavior as any).type}`); + } +} + // Utils /////////////////////////////////////////////////////////////////////////////////////////// function isSet(env: string | undefined): boolean {