Skip to content

Expose poller automation #275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions temporalio/ext/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,15 +480,21 @@ fn build_config(options: Struct) -> Result<WorkerConfig, Error> {
})
.client_identity_override(options.member::<Option<String>>(id!("identity_override"))?)
.max_cached_workflows(options.member::<usize>(id!("max_cached_workflows"))?)
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(
options.member::<usize>(id!("max_concurrent_workflow_task_polls"))?,
))
.workflow_task_poller_behavior({
let poller_behavior = options
.child(id!("workflow_task_poller_behavior"))?
.ok_or_else(|| error!("Worker options must have workflow_task_poller_behavior"))?;
extract_poller_behavior(poller_behavior)?
})
.nonsticky_to_sticky_poll_ratio(
options.member::<f32>(id!("nonsticky_to_sticky_poll_ratio"))?,
)
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(
options.member::<usize>(id!("max_concurrent_activity_task_polls"))?,
))
.activity_task_poller_behavior({
let poller_behavior = options
.child(id!("activity_task_poller_behavior"))?
.ok_or_else(|| error!("Worker options must have activity_task_poller_behavior"))?;
extract_poller_behavior(poller_behavior)?
})
.no_remote_activities(options.member::<bool>(id!("no_remote_activities"))?)
.sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64(
options.member(id!("sticky_queue_schedule_to_start_timeout"))?,
Expand Down Expand Up @@ -605,3 +611,15 @@ fn build_tuner_resource_options<SK: SlotKind>(
Some(slots_options),
))
}

fn extract_poller_behavior(poller_behavior: Struct) -> Result<PollerBehavior, Error> {
Ok(if poller_behavior.member::<usize>(id!("initial")).is_ok() {
PollerBehavior::Autoscaling {
minimum: poller_behavior.member::<usize>(id!("minimum"))?,
maximum: poller_behavior.member::<usize>(id!("maximum"))?,
initial: poller_behavior.member::<usize>(id!("initial"))?,
}
} else {
PollerBehavior::SimpleMaximum(poller_behavior.member::<usize>(id!("simple_maximum"))?)
})
}
16 changes: 14 additions & 2 deletions temporalio/lib/temporalio/internal/bridge/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ class Worker
:tuner,
:identity_override,
:max_cached_workflows,
:max_concurrent_workflow_task_polls,
:workflow_task_poller_behavior,
:nonsticky_to_sticky_poll_ratio,
:max_concurrent_activity_task_polls,
:activity_task_poller_behavior,
:no_remote_activities,
:sticky_queue_schedule_to_start_timeout,
:max_heartbeat_throttle_interval,
Expand Down Expand Up @@ -65,6 +65,18 @@ class Worker
keyword_init: true
)

PollerBehaviorSimpleMaximum = Struct.new(
:simple_maximum,
keyword_init: true
)

PollerBehaviorAutoscaling = Struct.new(
:minimum,
:maximum,
:initial,
keyword_init: true
)

def self.finalize_shutdown_all(workers)
queue = Queue.new
async_finalize_all(workers, queue)
Expand Down
87 changes: 34 additions & 53 deletions temporalio/lib/temporalio/versioning_override.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,69 +6,50 @@ module Temporalio
# Base class for version overrides that can be provided in start workflow options.
# Used to control the versioning behavior of workflows started with this override.
#
# Use factory methods {.auto_upgrade} or {.pinned} to create instances.
#
# WARNING: Experimental API.
class VersioningOverride
# Creates an auto-upgrade versioning override
# The workflow will auto-upgrade to the current deployment version on the next workflow task.
#
# @return [AutoUpgradeVersioningOverride] An auto-upgrade versioning override
def self.auto_upgrade
AutoUpgradeVersioningOverride.new
end

# Creates a pinned versioning override
# The workflow will be pinned to a specific deployment version.
#
# @param version [WorkerDeploymentVersion] The worker deployment version to pin the workflow to
# @return [PinnedVersioningOverride] A pinned versioning override
def self.pinned(version)
PinnedVersioningOverride.new(version)
end

# @!visibility private
def _to_proto
raise NotImplementedError, 'Subclasses must implement this method'
end
end

# Represents a versioning override to pin a workflow to a specific version
class PinnedVersioningOverride < VersioningOverride
# The worker deployment version to pin to
# @return [WorkerDeploymentVersion]
attr_reader :version

# Create a new pinned versioning override
#
# @param version [WorkerDeploymentVersion] The worker deployment version to pin to
def initialize(version)
@version = version
super()
end

# TODO: Remove deprecated field setting once removed from server

# @!visibility private
def _to_proto
Temporalio::Api::Workflow::V1::VersioningOverride.new(
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED,
pinned_version: @version.to_canonical_string,
pinned: Temporalio::Api::Workflow::V1::VersioningOverride::PinnedOverride.new(
version: @version._to_proto
# Represents a versioning override to pin a workflow to a specific version
class Pinned < VersioningOverride
# The worker deployment version to pin to
# @return [WorkerDeploymentVersion]
attr_reader :version

# Create a new pinned versioning override
#
# @param version [WorkerDeploymentVersion] The worker deployment version to pin to
def initialize(version)
@version = version
super()
end

# TODO: Remove deprecated field setting once removed from server

# @!visibility private
def _to_proto
Temporalio::Api::Workflow::V1::VersioningOverride.new(
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED,
pinned_version: @version.to_canonical_string,
pinned: Temporalio::Api::Workflow::V1::VersioningOverride::PinnedOverride.new(
version: @version._to_proto
)
)
)
end
end
end

# Represents a versioning override to auto-upgrade a workflow
class AutoUpgradeVersioningOverride < VersioningOverride
# @!visibility private
def _to_proto
Temporalio::Api::Workflow::V1::VersioningOverride.new(
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE,
auto_upgrade: true
)
# Represents a versioning override to auto-upgrade a workflow
class AutoUpgrade < VersioningOverride
# @!visibility private
def _to_proto
Temporalio::Api::Workflow::V1::VersioningOverride.new(
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE,
auto_upgrade: true
)
end
end
end
end
15 changes: 13 additions & 2 deletions temporalio/lib/temporalio/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require 'temporalio/internal/worker/workflow_worker'
require 'temporalio/worker/activity_executor'
require 'temporalio/worker/interceptor'
require 'temporalio/worker/poller_behavior'
require 'temporalio/worker/thread_pool'
require 'temporalio/worker/tuner'
require 'temporalio/worker/workflow_executor'
Expand Down Expand Up @@ -52,6 +53,8 @@ class Worker
:workflow_payload_codec_thread_pool,
:unsafe_workflow_io_enabled,
:deployment_options,
:workflow_task_poller_behavior,
:activity_task_poller_behavior,
:debug_mode
)

Expand Down Expand Up @@ -354,6 +357,10 @@ def self.default_illegal_workflow_calls
# with a block for narrower enabling of IO.
# @param deployment_options [DeploymentOptions, nil] Deployment options for the worker.
# WARNING: This is an experimental feature and may change in the future.
# @param workflow_task_poller_behavior [PollerBehavior] Specify the behavior of workflow task
# polling. Defaults to a 5-poller maximum.
# @param activity_task_poller_behavior [PollerBehavior] Specify the behavior of activity task
# polling. Defaults to a 5-poller maximum.
# @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks
# if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is
# `true` or `1`.
Expand Down Expand Up @@ -385,6 +392,8 @@ def initialize(
workflow_payload_codec_thread_pool: nil,
unsafe_workflow_io_enabled: false,
deployment_options: Worker.default_deployment_options,
workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls),
activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls),
debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
)
raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?
Expand Down Expand Up @@ -419,6 +428,8 @@ def initialize(
workflow_payload_codec_thread_pool:,
unsafe_workflow_io_enabled:,
deployment_options:,
workflow_task_poller_behavior:,
activity_task_poller_behavior:,
debug_mode:
).freeze

Expand Down Expand Up @@ -446,9 +457,9 @@ def initialize(
tuner: tuner._to_bridge_options,
identity_override: identity,
max_cached_workflows:,
max_concurrent_workflow_task_polls:,
workflow_task_poller_behavior: workflow_task_poller_behavior._to_bridge_options,
nonsticky_to_sticky_poll_ratio:,
max_concurrent_activity_task_polls:,
activity_task_poller_behavior: activity_task_poller_behavior._to_bridge_options,
# For shutdown to work properly, we must disable remote activities
# ourselves if there are no activities
no_remote_activities: no_remote_activities || activities.empty?,
Expand Down
61 changes: 61 additions & 0 deletions temporalio/lib/temporalio/worker/poller_behavior.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

module Temporalio
class Worker
# Base class for poller behaviors that control how polling scales.
class PollerBehavior
# @!visibility private
def _to_bridge_options
raise NotImplementedError, 'Subclasses must implement this method'
end

# A poller behavior that attempts to poll as long as a slot is available, up to the
# provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
class SimpleMaximum < PollerBehavior
# @return [Integer] Maximum number of concurrent poll requests.
attr_reader :maximum

# @param maximum [Integer] Maximum number of concurrent poll requests.
def initialize(maximum)
super()
@maximum = maximum
end

# @!visibility private
def _to_bridge_options
Internal::Bridge::Worker::PollerBehaviorSimpleMaximum.new(simple_maximum: @maximum)
end
end

# A poller behavior that automatically scales the number of pollers based on feedback
# from the server. A slot must be available before beginning polling.
class Autoscaling < PollerBehavior
# @return [Integer] Minimum number of poll calls (assuming slots are available).
attr_reader :minimum
# @return [Integer] Maximum number of poll calls that will ever be open at once.
attr_reader :maximum
# @return [Integer] Number of polls attempted initially before scaling kicks in.
attr_reader :initial

# @param minimum [Integer] Minimum number of poll calls (assuming slots are available).
# @param maximum [Integer] Maximum number of poll calls that will ever be open at once.
# @param initial [Integer] Number of polls attempted initially before scaling kicks in.
def initialize(minimum: 1, maximum: 100, initial: 5)
super()
@minimum = minimum
@maximum = maximum
@initial = initial
end

# @!visibility private
def _to_bridge_options
Internal::Bridge::Worker::PollerBehaviorAutoscaling.new(
minimum: @minimum,
maximum: @maximum,
initial: @initial
)
end
end
end
end
end
7 changes: 5 additions & 2 deletions temporalio/lib/temporalio/worker/workflow_replayer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'temporalio/internal/worker/multi_runner'
require 'temporalio/internal/worker/workflow_worker'
require 'temporalio/worker/interceptor'
require 'temporalio/worker/poller_behavior'
require 'temporalio/worker/thread_pool'
require 'temporalio/worker/tuner'
require 'temporalio/worker/workflow_executor'
Expand Down Expand Up @@ -205,9 +206,11 @@ def initialize(
)._to_bridge_options,
identity_override: options.identity,
max_cached_workflows: 2,
max_concurrent_workflow_task_polls: 2,
workflow_task_poller_behavior:
Temporalio::Worker::PollerBehavior::SimpleMaximum.new(2)._to_bridge_options,
nonsticky_to_sticky_poll_ratio: 1.0,
max_concurrent_activity_task_polls: 1,
activity_task_poller_behavior:
Temporalio::Worker::PollerBehavior::SimpleMaximum.new(1)._to_bridge_options,
no_remote_activities: true,
sticky_queue_schedule_to_start_timeout: 1.0,
max_heartbeat_throttle_interval: 1.0,
Expand Down
22 changes: 18 additions & 4 deletions temporalio/sig/temporalio/internal/bridge/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ module Temporalio
attr_accessor tuner: TunerOptions
attr_accessor identity_override: String?
attr_accessor max_cached_workflows: Integer
attr_accessor max_concurrent_workflow_task_polls: Integer
attr_accessor workflow_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling
attr_accessor nonsticky_to_sticky_poll_ratio: Float
attr_accessor max_concurrent_activity_task_polls: Integer
attr_accessor activity_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling
attr_accessor no_remote_activities: bool
attr_accessor sticky_queue_schedule_to_start_timeout: Float
attr_accessor max_heartbeat_throttle_interval: Float
Expand All @@ -32,9 +32,9 @@ module Temporalio
tuner: TunerOptions,
identity_override: String?,
max_cached_workflows: Integer,
max_concurrent_workflow_task_polls: Integer,
workflow_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling,
nonsticky_to_sticky_poll_ratio: Float,
max_concurrent_activity_task_polls: Integer,
activity_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling,
no_remote_activities: bool,
sticky_queue_schedule_to_start_timeout: Float,
max_heartbeat_throttle_interval: Float,
Expand Down Expand Up @@ -142,6 +142,20 @@ module Temporalio
) -> void
end

class PollerBehaviorSimpleMaximum
attr_accessor simple_maximum: Integer

def initialize: (simple_maximum: Integer) -> void
end

class PollerBehaviorAutoscaling
attr_accessor minimum: Integer
attr_accessor maximum: Integer
attr_accessor initial: Integer

def initialize: (minimum: Integer, maximum: Integer, initial: Integer) -> void
end

class WorkflowReplayer
def self.new: (Runtime runtime, Options options) -> [WorkflowReplayer, Worker]

Expand Down
21 changes: 9 additions & 12 deletions temporalio/sig/temporalio/versioning_override.rbs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
module Temporalio
class VersioningOverride
def self.auto_upgrade: -> AutoUpgradeVersioningOverride
def self.pinned: (WorkerDeploymentVersion version) -> PinnedVersioningOverride

def _to_proto: -> untyped
end

class PinnedVersioningOverride < VersioningOverride
attr_reader version: WorkerDeploymentVersion
class Pinned < VersioningOverride
attr_reader version: WorkerDeploymentVersion

def initialize: (WorkerDeploymentVersion version) -> void
def _to_proto: -> untyped
end
def initialize: (WorkerDeploymentVersion version) -> void
def _to_proto: -> untyped
end

class AutoUpgradeVersioningOverride < VersioningOverride
def initialize: -> void
def _to_proto: -> untyped
class AutoUpgrade < VersioningOverride
def initialize: -> void
def _to_proto: -> untyped
end
end
end
Loading
Loading