Skip to content

Commit 251c16f

Browse files
authored
Expose poller automation (#275)
1 parent 196e0ac commit 251c16f

File tree

12 files changed

+288
-82
lines changed

12 files changed

+288
-82
lines changed

temporalio/ext/src/worker.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -480,15 +480,21 @@ fn build_config(options: Struct) -> Result<WorkerConfig, Error> {
480480
})
481481
.client_identity_override(options.member::<Option<String>>(id!("identity_override"))?)
482482
.max_cached_workflows(options.member::<usize>(id!("max_cached_workflows"))?)
483-
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(
484-
options.member::<usize>(id!("max_concurrent_workflow_task_polls"))?,
485-
))
483+
.workflow_task_poller_behavior({
484+
let poller_behavior = options
485+
.child(id!("workflow_task_poller_behavior"))?
486+
.ok_or_else(|| error!("Worker options must have workflow_task_poller_behavior"))?;
487+
extract_poller_behavior(poller_behavior)?
488+
})
486489
.nonsticky_to_sticky_poll_ratio(
487490
options.member::<f32>(id!("nonsticky_to_sticky_poll_ratio"))?,
488491
)
489-
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(
490-
options.member::<usize>(id!("max_concurrent_activity_task_polls"))?,
491-
))
492+
.activity_task_poller_behavior({
493+
let poller_behavior = options
494+
.child(id!("activity_task_poller_behavior"))?
495+
.ok_or_else(|| error!("Worker options must have activity_task_poller_behavior"))?;
496+
extract_poller_behavior(poller_behavior)?
497+
})
492498
.no_remote_activities(options.member::<bool>(id!("no_remote_activities"))?)
493499
.sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64(
494500
options.member(id!("sticky_queue_schedule_to_start_timeout"))?,
@@ -605,3 +611,15 @@ fn build_tuner_resource_options<SK: SlotKind>(
605611
Some(slots_options),
606612
))
607613
}
614+
615+
fn extract_poller_behavior(poller_behavior: Struct) -> Result<PollerBehavior, Error> {
616+
Ok(if poller_behavior.member::<usize>(id!("initial")).is_ok() {
617+
PollerBehavior::Autoscaling {
618+
minimum: poller_behavior.member::<usize>(id!("minimum"))?,
619+
maximum: poller_behavior.member::<usize>(id!("maximum"))?,
620+
initial: poller_behavior.member::<usize>(id!("initial"))?,
621+
}
622+
} else {
623+
PollerBehavior::SimpleMaximum(poller_behavior.member::<usize>(id!("simple_maximum"))?)
624+
})
625+
}

temporalio/lib/temporalio/internal/bridge/worker.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ class Worker
1414
:tuner,
1515
:identity_override,
1616
:max_cached_workflows,
17-
:max_concurrent_workflow_task_polls,
17+
:workflow_task_poller_behavior,
1818
:nonsticky_to_sticky_poll_ratio,
19-
:max_concurrent_activity_task_polls,
19+
:activity_task_poller_behavior,
2020
:no_remote_activities,
2121
:sticky_queue_schedule_to_start_timeout,
2222
:max_heartbeat_throttle_interval,
@@ -65,6 +65,18 @@ class Worker
6565
keyword_init: true
6666
)
6767

68+
PollerBehaviorSimpleMaximum = Struct.new(
69+
:simple_maximum,
70+
keyword_init: true
71+
)
72+
73+
PollerBehaviorAutoscaling = Struct.new(
74+
:minimum,
75+
:maximum,
76+
:initial,
77+
keyword_init: true
78+
)
79+
6880
def self.finalize_shutdown_all(workers)
6981
queue = Queue.new
7082
async_finalize_all(workers, queue)

temporalio/lib/temporalio/versioning_override.rb

Lines changed: 34 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,69 +6,50 @@ module Temporalio
66
# Base class for version overrides that can be provided in start workflow options.
77
# Used to control the versioning behavior of workflows started with this override.
88
#
9-
# Use factory methods {.auto_upgrade} or {.pinned} to create instances.
10-
#
119
# WARNING: Experimental API.
1210
class VersioningOverride
13-
# Creates an auto-upgrade versioning override
14-
# The workflow will auto-upgrade to the current deployment version on the next workflow task.
15-
#
16-
# @return [AutoUpgradeVersioningOverride] An auto-upgrade versioning override
17-
def self.auto_upgrade
18-
AutoUpgradeVersioningOverride.new
19-
end
20-
21-
# Creates a pinned versioning override
22-
# The workflow will be pinned to a specific deployment version.
23-
#
24-
# @param version [WorkerDeploymentVersion] The worker deployment version to pin the workflow to
25-
# @return [PinnedVersioningOverride] A pinned versioning override
26-
def self.pinned(version)
27-
PinnedVersioningOverride.new(version)
28-
end
29-
3011
# @!visibility private
3112
def _to_proto
3213
raise NotImplementedError, 'Subclasses must implement this method'
3314
end
34-
end
35-
36-
# Represents a versioning override to pin a workflow to a specific version
37-
class PinnedVersioningOverride < VersioningOverride
38-
# The worker deployment version to pin to
39-
# @return [WorkerDeploymentVersion]
40-
attr_reader :version
41-
42-
# Create a new pinned versioning override
43-
#
44-
# @param version [WorkerDeploymentVersion] The worker deployment version to pin to
45-
def initialize(version)
46-
@version = version
47-
super()
48-
end
49-
50-
# TODO: Remove deprecated field setting once removed from server
5115

52-
# @!visibility private
53-
def _to_proto
54-
Temporalio::Api::Workflow::V1::VersioningOverride.new(
55-
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED,
56-
pinned_version: @version.to_canonical_string,
57-
pinned: Temporalio::Api::Workflow::V1::VersioningOverride::PinnedOverride.new(
58-
version: @version._to_proto
16+
# Represents a versioning override to pin a workflow to a specific version
17+
class Pinned < VersioningOverride
18+
# The worker deployment version to pin to
19+
# @return [WorkerDeploymentVersion]
20+
attr_reader :version
21+
22+
# Create a new pinned versioning override
23+
#
24+
# @param version [WorkerDeploymentVersion] The worker deployment version to pin to
25+
def initialize(version)
26+
@version = version
27+
super()
28+
end
29+
30+
# TODO: Remove deprecated field setting once removed from server
31+
32+
# @!visibility private
33+
def _to_proto
34+
Temporalio::Api::Workflow::V1::VersioningOverride.new(
35+
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED,
36+
pinned_version: @version.to_canonical_string,
37+
pinned: Temporalio::Api::Workflow::V1::VersioningOverride::PinnedOverride.new(
38+
version: @version._to_proto
39+
)
5940
)
60-
)
41+
end
6142
end
62-
end
6343

64-
# Represents a versioning override to auto-upgrade a workflow
65-
class AutoUpgradeVersioningOverride < VersioningOverride
66-
# @!visibility private
67-
def _to_proto
68-
Temporalio::Api::Workflow::V1::VersioningOverride.new(
69-
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE,
70-
auto_upgrade: true
71-
)
44+
# Represents a versioning override to auto-upgrade a workflow
45+
class AutoUpgrade < VersioningOverride
46+
# @!visibility private
47+
def _to_proto
48+
Temporalio::Api::Workflow::V1::VersioningOverride.new(
49+
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE,
50+
auto_upgrade: true
51+
)
52+
end
7253
end
7354
end
7455
end

temporalio/lib/temporalio/worker.rb

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
require 'temporalio/internal/worker/workflow_worker'
1414
require 'temporalio/worker/activity_executor'
1515
require 'temporalio/worker/interceptor'
16+
require 'temporalio/worker/poller_behavior'
1617
require 'temporalio/worker/thread_pool'
1718
require 'temporalio/worker/tuner'
1819
require 'temporalio/worker/workflow_executor'
@@ -52,6 +53,8 @@ class Worker
5253
:workflow_payload_codec_thread_pool,
5354
:unsafe_workflow_io_enabled,
5455
:deployment_options,
56+
:workflow_task_poller_behavior,
57+
:activity_task_poller_behavior,
5558
:debug_mode
5659
)
5760

@@ -354,6 +357,10 @@ def self.default_illegal_workflow_calls
354357
# with a block for narrower enabling of IO.
355358
# @param deployment_options [DeploymentOptions, nil] Deployment options for the worker.
356359
# WARNING: This is an experimental feature and may change in the future.
360+
# @param workflow_task_poller_behavior [PollerBehavior] Specify the behavior of workflow task
361+
# polling. Defaults to a 5-poller maximum.
362+
# @param activity_task_poller_behavior [PollerBehavior] Specify the behavior of activity task
363+
# polling. Defaults to a 5-poller maximum.
357364
# @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks
358365
# if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is
359366
# `true` or `1`.
@@ -385,6 +392,8 @@ def initialize(
385392
workflow_payload_codec_thread_pool: nil,
386393
unsafe_workflow_io_enabled: false,
387394
deployment_options: Worker.default_deployment_options,
395+
workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls),
396+
activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls),
388397
debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
389398
)
390399
raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?
@@ -419,6 +428,8 @@ def initialize(
419428
workflow_payload_codec_thread_pool:,
420429
unsafe_workflow_io_enabled:,
421430
deployment_options:,
431+
workflow_task_poller_behavior:,
432+
activity_task_poller_behavior:,
422433
debug_mode:
423434
).freeze
424435

@@ -446,9 +457,9 @@ def initialize(
446457
tuner: tuner._to_bridge_options,
447458
identity_override: identity,
448459
max_cached_workflows:,
449-
max_concurrent_workflow_task_polls:,
460+
workflow_task_poller_behavior: workflow_task_poller_behavior._to_bridge_options,
450461
nonsticky_to_sticky_poll_ratio:,
451-
max_concurrent_activity_task_polls:,
462+
activity_task_poller_behavior: activity_task_poller_behavior._to_bridge_options,
452463
# For shutdown to work properly, we must disable remote activities
453464
# ourselves if there are no activities
454465
no_remote_activities: no_remote_activities || activities.empty?,
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# frozen_string_literal: true
2+
3+
module Temporalio
4+
class Worker
5+
# Base class for poller behaviors that control how polling scales.
6+
class PollerBehavior
7+
# @!visibility private
8+
def _to_bridge_options
9+
raise NotImplementedError, 'Subclasses must implement this method'
10+
end
11+
12+
# A poller behavior that attempts to poll as long as a slot is available, up to the
13+
# provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
14+
class SimpleMaximum < PollerBehavior
15+
# @return [Integer] Maximum number of concurrent poll requests.
16+
attr_reader :maximum
17+
18+
# @param maximum [Integer] Maximum number of concurrent poll requests.
19+
def initialize(maximum)
20+
super()
21+
@maximum = maximum
22+
end
23+
24+
# @!visibility private
25+
def _to_bridge_options
26+
Internal::Bridge::Worker::PollerBehaviorSimpleMaximum.new(simple_maximum: @maximum)
27+
end
28+
end
29+
30+
# A poller behavior that automatically scales the number of pollers based on feedback
31+
# from the server. A slot must be available before beginning polling.
32+
class Autoscaling < PollerBehavior
33+
# @return [Integer] Minimum number of poll calls (assuming slots are available).
34+
attr_reader :minimum
35+
# @return [Integer] Maximum number of poll calls that will ever be open at once.
36+
attr_reader :maximum
37+
# @return [Integer] Number of polls attempted initially before scaling kicks in.
38+
attr_reader :initial
39+
40+
# @param minimum [Integer] Minimum number of poll calls (assuming slots are available).
41+
# @param maximum [Integer] Maximum number of poll calls that will ever be open at once.
42+
# @param initial [Integer] Number of polls attempted initially before scaling kicks in.
43+
def initialize(minimum: 1, maximum: 100, initial: 5)
44+
super()
45+
@minimum = minimum
46+
@maximum = maximum
47+
@initial = initial
48+
end
49+
50+
# @!visibility private
51+
def _to_bridge_options
52+
Internal::Bridge::Worker::PollerBehaviorAutoscaling.new(
53+
minimum: @minimum,
54+
maximum: @maximum,
55+
initial: @initial
56+
)
57+
end
58+
end
59+
end
60+
end
61+
end

temporalio/lib/temporalio/worker/workflow_replayer.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require 'temporalio/internal/worker/multi_runner'
88
require 'temporalio/internal/worker/workflow_worker'
99
require 'temporalio/worker/interceptor'
10+
require 'temporalio/worker/poller_behavior'
1011
require 'temporalio/worker/thread_pool'
1112
require 'temporalio/worker/tuner'
1213
require 'temporalio/worker/workflow_executor'
@@ -205,9 +206,11 @@ def initialize(
205206
)._to_bridge_options,
206207
identity_override: options.identity,
207208
max_cached_workflows: 2,
208-
max_concurrent_workflow_task_polls: 2,
209+
workflow_task_poller_behavior:
210+
Temporalio::Worker::PollerBehavior::SimpleMaximum.new(2)._to_bridge_options,
209211
nonsticky_to_sticky_poll_ratio: 1.0,
210-
max_concurrent_activity_task_polls: 1,
212+
activity_task_poller_behavior:
213+
Temporalio::Worker::PollerBehavior::SimpleMaximum.new(1)._to_bridge_options,
211214
no_remote_activities: true,
212215
sticky_queue_schedule_to_start_timeout: 1.0,
213216
max_heartbeat_throttle_interval: 1.0,

temporalio/sig/temporalio/internal/bridge/worker.rbs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ module Temporalio
1010
attr_accessor tuner: TunerOptions
1111
attr_accessor identity_override: String?
1212
attr_accessor max_cached_workflows: Integer
13-
attr_accessor max_concurrent_workflow_task_polls: Integer
13+
attr_accessor workflow_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling
1414
attr_accessor nonsticky_to_sticky_poll_ratio: Float
15-
attr_accessor max_concurrent_activity_task_polls: Integer
15+
attr_accessor activity_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling
1616
attr_accessor no_remote_activities: bool
1717
attr_accessor sticky_queue_schedule_to_start_timeout: Float
1818
attr_accessor max_heartbeat_throttle_interval: Float
@@ -32,9 +32,9 @@ module Temporalio
3232
tuner: TunerOptions,
3333
identity_override: String?,
3434
max_cached_workflows: Integer,
35-
max_concurrent_workflow_task_polls: Integer,
35+
workflow_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling,
3636
nonsticky_to_sticky_poll_ratio: Float,
37-
max_concurrent_activity_task_polls: Integer,
37+
activity_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling,
3838
no_remote_activities: bool,
3939
sticky_queue_schedule_to_start_timeout: Float,
4040
max_heartbeat_throttle_interval: Float,
@@ -142,6 +142,20 @@ module Temporalio
142142
) -> void
143143
end
144144

145+
class PollerBehaviorSimpleMaximum
146+
attr_accessor simple_maximum: Integer
147+
148+
def initialize: (simple_maximum: Integer) -> void
149+
end
150+
151+
class PollerBehaviorAutoscaling
152+
attr_accessor minimum: Integer
153+
attr_accessor maximum: Integer
154+
attr_accessor initial: Integer
155+
156+
def initialize: (minimum: Integer, maximum: Integer, initial: Integer) -> void
157+
end
158+
145159
class WorkflowReplayer
146160
def self.new: (Runtime runtime, Options options) -> [WorkflowReplayer, Worker]
147161

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
module Temporalio
22
class VersioningOverride
3-
def self.auto_upgrade: -> AutoUpgradeVersioningOverride
4-
def self.pinned: (WorkerDeploymentVersion version) -> PinnedVersioningOverride
5-
63
def _to_proto: -> untyped
7-
end
84

9-
class PinnedVersioningOverride < VersioningOverride
10-
attr_reader version: WorkerDeploymentVersion
5+
class Pinned < VersioningOverride
6+
attr_reader version: WorkerDeploymentVersion
117

12-
def initialize: (WorkerDeploymentVersion version) -> void
13-
def _to_proto: -> untyped
14-
end
8+
def initialize: (WorkerDeploymentVersion version) -> void
9+
def _to_proto: -> untyped
10+
end
1511

16-
class AutoUpgradeVersioningOverride < VersioningOverride
17-
def initialize: -> void
18-
def _to_proto: -> untyped
12+
class AutoUpgrade < VersioningOverride
13+
def initialize: -> void
14+
def _to_proto: -> untyped
15+
end
1916
end
2017
end

0 commit comments

Comments
 (0)