Skip to content

feat(worker): Expose Poller Automation #1704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions packages/core-bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,9 @@ mod config {
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder,
api::worker::{
ActivitySlotKind, LocalActivitySlotKind, PollerBehavior, SlotKind, WorkerConfig,
WorkerConfigBuilder, WorkerConfigBuilderError, WorkflowSlotKind,
ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior,
SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError,
WorkflowSlotKind,
},
};

Expand All @@ -428,8 +429,8 @@ mod config {
namespace: String,
tuner: WorkerTuner,
non_sticky_to_sticky_poll_ratio: f32,
max_concurrent_workflow_task_polls: usize,
max_concurrent_activity_task_polls: usize,
workflow_task_poller_behavior: PollerBehavior,
activity_task_poller_behavior: PollerBehavior,
enable_non_local_activities: bool,
sticky_queue_schedule_to_start_timeout: Duration,
max_cached_workflows: usize,
Expand All @@ -440,6 +441,18 @@ mod config {
shutdown_grace_time: Option<Duration>,
}

#[derive(TryFromJs)]
pub enum PollerBehavior {
SimpleMaximum {
maximum: usize,
},
Autoscaling {
minimum: usize,
maximum: usize,
initial: usize,
},
}

impl BridgeWorkerOptions {
pub(crate) fn into_core_config(self) -> Result<WorkerConfig, WorkerConfigBuilderError> {
// Set all other options
Expand All @@ -452,12 +465,8 @@ mod config {
.namespace(self.namespace)
.tuner(self.tuner.into_core_config()?)
.nonsticky_to_sticky_poll_ratio(self.non_sticky_to_sticky_poll_ratio)
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(
self.max_concurrent_workflow_task_polls,
))
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(
self.max_concurrent_activity_task_polls,
))
.workflow_task_poller_behavior(self.workflow_task_poller_behavior)
.activity_task_poller_behavior(self.activity_task_poller_behavior)
.no_remote_activities(!self.enable_non_local_activities)
.sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
.max_cached_workflows(self.max_cached_workflows)
Expand All @@ -470,6 +479,25 @@ mod config {
}
}

impl From<PollerBehavior> for CorePollerBehavior {
fn from(val: PollerBehavior) -> Self {
match val {
PollerBehavior::SimpleMaximum { maximum } => {
CorePollerBehavior::SimpleMaximum(maximum)
}
PollerBehavior::Autoscaling {
minimum,
maximum,
initial,
} => CorePollerBehavior::Autoscaling {
minimum,
maximum,
initial,
},
}
}
}

#[derive(TryFromJs)]
#[allow(clippy::struct_field_names)]
pub(super) struct WorkerTuner {
Expand Down
16 changes: 14 additions & 2 deletions packages/core-bridge/ts/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ export interface WorkerOptions {
namespace: string;
tuner: WorkerTunerOptions;
nonStickyToStickyPollRatio: number;
maxConcurrentWorkflowTaskPolls: number;
maxConcurrentActivityTaskPolls: number;
workflowTaskPollerBehavior: PollerBehavior;
activityTaskPollerBehavior: PollerBehavior;
enableNonLocalActivities: boolean;
stickyQueueScheduleToStartTimeout: number;
maxCachedWorkflows: number;
Expand All @@ -191,6 +191,18 @@ export interface WorkerOptions {
shutdownGraceTime: number;
}

export type PollerBehavior =
| {
type: 'simple-maximum';
maximum: number;
}
| {
type: 'autoscaling';
minimum: number;
maximum: number;
initial: number;
};

////////////////////////////////////////////////////////////////////////////////////////////////////
// Worker Tuner
////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
12 changes: 10 additions & 2 deletions packages/test/src/test-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,16 @@ const GenericConfigs = {
},
},
nonStickyToStickyPollRatio: 0.5,
maxConcurrentWorkflowTaskPolls: 1,
maxConcurrentActivityTaskPolls: 1,
workflowTaskPollerBehavior: {
type: 'simple-maximum',
maximum: 1,
},
activityTaskPollerBehavior: {
type: 'autoscaling',
minimum: 1,
initial: 5,
maximum: 100,
},
enableNonLocalActivities: false,
stickyQueueScheduleToStartTimeout: 1000,
maxCachedWorkflows: 1000,
Expand Down
83 changes: 83 additions & 0 deletions packages/test/src/test-worker-poller-autoscale.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { v4 as uuid } from 'uuid';
import fetch from 'node-fetch';
import test from 'ava';
import { Runtime, Worker } from '@temporalio/worker';
import { getRandomPort, TestWorkflowEnvironment } from './helpers';
import * as activities from './activities';
import * as workflows from './workflows';

test.serial('Can run autoscaling polling worker', async (t) => {
const port = await getRandomPort();
Runtime.install({
telemetryOptions: {
metrics: {
prometheus: {
bindAddress: `127.0.0.1:${port}`,
},
},
},
});
const localEnv = await TestWorkflowEnvironment.createLocal();

try {
const taskQueue = `autoscale-pollers-${uuid()}`;
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
connection: localEnv.nativeConnection,
taskQueue,
workflowTaskPollerBehavior: {
type: 'autoscaling',
initial: 2,
},
activityTaskPollerBehavior: {
type: 'autoscaling',
initial: 2,
},
});
const workerPromise = worker.run();

// Give pollers a beat to start
await new Promise((resolve) => setTimeout(resolve, 300));

const resp = await fetch(`http://127.0.0.1:${port}/metrics`);
const metricsText = await resp.text();
const metricsLines = metricsText.split('\n');

const matches = metricsLines.filter((l) => l.includes('temporal_num_pollers'));
const activity_pollers = matches.filter((l) => l.includes('activity_task'));
t.is(activity_pollers.length, 1, 'Should have exactly one activity poller metric');
t.true(activity_pollers[0].endsWith('2'), 'Activity poller count should be 2');
const workflow_pollers = matches.filter((l) => l.includes('workflow_task'));
t.is(workflow_pollers.length, 2, 'Should have exactly two workflow poller metrics (sticky and non-sticky)');

// There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on
// initialization timing.
t.true(
workflow_pollers[0].endsWith('2') || workflow_pollers[0].endsWith('1'),
'First workflow poller count should be 1 or 2'
);
t.true(
workflow_pollers[1].endsWith('2') || workflow_pollers[1].endsWith('1'),
'Second workflow poller count should be 1 or 2'
);

const workflowPromises = Array(20)
.fill(0)
.map(async (_) => {
const handle = await localEnv.client.workflow.start(workflows.waitOnSignalThenActivity, {
taskQueue,
workflowId: `resource-based-${uuid()}`,
});
await handle.signal('my-signal', 'finish');
return handle.result();
});

await Promise.all(workflowPromises);
worker.shutdown();
await workerPromise;
t.pass();
} finally {
await localEnv.teardown();
}
});
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,4 @@ export * from './wait-on-user';
export * from './workflow-cancellation-scenarios';
export * from './upsert-and-read-memo';
export * from './updates-ordering';
export * from './wait-on-signal-then-activity';
15 changes: 15 additions & 0 deletions packages/test/src/workflows/wait-on-signal-then-activity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import * as wf from '@temporalio/workflow';
import type * as activities from '../activities';

const { echo } = wf.proxyActivities<typeof activities>({ startToCloseTimeout: '5s' });
export const mySignal = wf.defineSignal<[string]>('my-signal');
export async function waitOnSignalThenActivity(): Promise<void> {
let lastSignal = '<none>';

wf.setHandler(mySignal, (value: string) => {
lastSignal = value;
});

await wf.condition(() => lastSignal === 'finish');
await echo('hi');
}
Loading
Loading