Skip to content

Commit b2da34b

Browse files
committed
Review comments
1 parent a6047f7 commit b2da34b

File tree

7 files changed

+65
-89
lines changed

7 files changed

+65
-89
lines changed

temporalio/ext/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,6 @@ fn extract_poller_behavior(poller_behavior: Struct) -> Result<PollerBehavior, Er
620620
initial: poller_behavior.member::<usize>(id!("initial"))?,
621621
}
622622
} else {
623-
PollerBehavior::SimpleMaximum(poller_behavior.member::<usize>(id!("maximum"))?)
623+
PollerBehavior::SimpleMaximum(poller_behavior.member::<usize>(id!("simple_maximum"))?)
624624
})
625625
}

temporalio/lib/temporalio/worker.rb

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ class Worker
5353
:workflow_payload_codec_thread_pool,
5454
:unsafe_workflow_io_enabled,
5555
:deployment_options,
56-
:debug_mode,
5756
:workflow_task_poller_behavior,
58-
:activity_task_poller_behavior
57+
:activity_task_poller_behavior,
58+
:debug_mode
5959
)
6060

6161
# Options as returned from {options} for `**to_h` splat use in {initialize}. See {initialize} for details.
@@ -357,6 +357,10 @@ def self.default_illegal_workflow_calls
357357
# with a block for narrower enabling of IO.
358358
# @param deployment_options [DeploymentOptions, nil] Deployment options for the worker.
359359
# WARNING: This is an experimental feature and may change in the future.
360+
# @param workflow_task_poller_behavior [PollerBehavior] Specify the behavior of workflow task
361+
# polling. Defaults to a 5-poller maximum.
362+
# @param activity_task_poller_behavior [PollerBehavior] Specify the behavior of activity task
363+
# polling. Defaults to a 5-poller maximum.
360364
# @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks
361365
# if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is
362366
# `true` or `1`.
@@ -388,8 +392,8 @@ def initialize(
388392
workflow_payload_codec_thread_pool: nil,
389393
unsafe_workflow_io_enabled: false,
390394
deployment_options: Worker.default_deployment_options,
391-
workflow_task_poller_behavior: nil,
392-
activity_task_poller_behavior: nil,
395+
workflow_task_poller_behavior: SimpleMaximumPollerBehavior.new(maximum: max_concurrent_workflow_task_polls),
396+
activity_task_poller_behavior: SimpleMaximumPollerBehavior.new(maximum: max_concurrent_activity_task_polls),
393397
debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
394398
)
395399
raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?
@@ -424,9 +428,9 @@ def initialize(
424428
workflow_payload_codec_thread_pool:,
425429
unsafe_workflow_io_enabled:,
426430
deployment_options:,
427-
debug_mode:,
428431
workflow_task_poller_behavior:,
429-
activity_task_poller_behavior:
432+
activity_task_poller_behavior:,
433+
debug_mode:
430434
).freeze
431435

432436
should_enforce_versioning_behavior =
@@ -442,10 +446,6 @@ def initialize(
442446
workflow_failure_exception_types:, workflow_definitions:
443447
)
444448

445-
# Convert deprecated max concurrent polls to poller behaviors if not specified
446-
workflow_task_poller_behavior ||= SimpleMaximumPollerBehavior.new(maximum: max_concurrent_workflow_task_polls)
447-
activity_task_poller_behavior ||= SimpleMaximumPollerBehavior.new(maximum: max_concurrent_activity_task_polls)
448-
449449
# Create the bridge worker
450450
@bridge_worker = Internal::Bridge::Worker.new(
451451
client.connection._core_client,

temporalio/lib/temporalio/worker/poller_behavior.rb

Lines changed: 44 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -3,82 +3,58 @@
33
module Temporalio
44
class Worker
55
# Base class for poller behaviors that control how polling scales.
6-
#
7-
# Use factory methods {.simple_maximum} or {.autoscaling} to create instances.
86
class PollerBehavior
9-
# Creates a simple maximum poller behavior
10-
# The poller will attempt to poll as long as a slot is available, up to the
11-
# provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
12-
#
13-
# @param maximum [Integer] Maximum number of concurrent poll requests.
14-
# @return [SimpleMaximumPollerBehavior] A simple maximum poller behavior
15-
def self.simple_maximum(maximum = 5)
16-
SimpleMaximumPollerBehavior.new(maximum: maximum)
17-
end
18-
19-
# Creates an autoscaling poller behavior
20-
# The poller will automatically scale the number of pollers based on feedback
21-
# from the server. A slot must be available before beginning polling.
22-
#
23-
# @param minimum [Integer] Minimum number of poll calls (assuming slots are available).
24-
# @param maximum [Integer] Maximum number of poll calls that will ever be open at once.
25-
# @param initial [Integer] Number of polls attempted initially before scaling kicks in.
26-
# @return [AutoscalingPollerBehavior] An autoscaling poller behavior
27-
def self.autoscaling(minimum: 1, maximum: 100, initial: 5)
28-
AutoscalingPollerBehavior.new(minimum: minimum, maximum: maximum, initial: initial)
29-
end
30-
317
# @!visibility private
328
def _to_bridge_options
339
raise NotImplementedError, 'Subclasses must implement this method'
3410
end
35-
end
36-
37-
# A poller behavior that attempts to poll as long as a slot is available, up to the
38-
# provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
39-
class SimpleMaximumPollerBehavior < PollerBehavior
40-
# @return [Integer] Maximum number of concurrent poll requests.
41-
attr_reader :maximum
42-
43-
# @param maximum [Integer] Maximum number of concurrent poll requests.
44-
def initialize(maximum: 5)
45-
super()
46-
@maximum = maximum
47-
end
4811

49-
# @!visibility private
50-
def _to_bridge_options
51-
Internal::Bridge::Worker::PollerBehaviorSimpleMaximum.new(simple_maximum: maximum)
52-
end
53-
end
54-
55-
# A poller behavior that automatically scales the number of pollers based on feedback
56-
# from the server. A slot must be available before beginning polling.
57-
class AutoscalingPollerBehavior < PollerBehavior
58-
# @return [Integer] Minimum number of poll calls (assuming slots are available).
59-
attr_reader :minimum
60-
# @return [Integer] Maximum number of poll calls that will ever be open at once.
61-
attr_reader :maximum
62-
# @return [Integer] Number of polls attempted initially before scaling kicks in.
63-
attr_reader :initial
64-
65-
# @param minimum [Integer] Minimum number of poll calls (assuming slots are available).
66-
# @param maximum [Integer] Maximum number of poll calls that will ever be open at once.
67-
# @param initial [Integer] Number of polls attempted initially before scaling kicks in.
68-
def initialize(minimum: 1, maximum: 100, initial: 5)
69-
super()
70-
@minimum = minimum
71-
@maximum = maximum
72-
@initial = initial
12+
# A poller behavior that attempts to poll as long as a slot is available, up to the
13+
# provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
14+
class SimpleMaximum < PollerBehavior
15+
# @return [Integer] Maximum number of concurrent poll requests.
16+
attr_reader :maximum
17+
18+
# @param maximum [Integer] Maximum number of concurrent poll requests.
19+
def initialize(maximum: 5)
20+
super()
21+
@maximum = maximum
22+
end
23+
24+
# @!visibility private
25+
def _to_bridge_options
26+
Internal::Bridge::Worker::PollerBehaviorSimpleMaximum.new(simple_maximum: @maximum)
27+
end
7328
end
7429

75-
# @!visibility private
76-
def _to_bridge_options
77-
Internal::Bridge::Worker::PollerBehaviorAutoscaling.new(
78-
minimum: minimum,
79-
maximum: maximum,
80-
initial: initial
81-
)
30+
# A poller behavior that automatically scales the number of pollers based on feedback
31+
# from the server. A slot must be available before beginning polling.
32+
class Autoscaling < PollerBehavior
33+
# @return [Integer] Minimum number of poll calls (assuming slots are available).
34+
attr_reader :minimum
35+
# @return [Integer] Maximum number of poll calls that will ever be open at once.
36+
attr_reader :maximum
37+
# @return [Integer] Number of polls attempted initially before scaling kicks in.
38+
attr_reader :initial
39+
40+
# @param minimum [Integer] Minimum number of poll calls (assuming slots are available).
41+
# @param maximum [Integer] Maximum number of poll calls that will ever be open at once.
42+
# @param initial [Integer] Number of polls attempted initially before scaling kicks in.
43+
def initialize(minimum: 1, maximum: 100, initial: 5)
44+
super()
45+
@minimum = minimum
46+
@maximum = maximum
47+
@initial = initial
48+
end
49+
50+
# @!visibility private
51+
def _to_bridge_options
52+
Internal::Bridge::Worker::PollerBehaviorAutoscaling.new(
53+
minimum: @minimum,
54+
maximum: @maximum,
55+
initial: @initial
56+
)
57+
end
8258
end
8359
end
8460
end

temporalio/lib/temporalio/worker/workflow_replayer.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,11 @@ def initialize(
206206
)._to_bridge_options,
207207
identity_override: options.identity,
208208
max_cached_workflows: 2,
209-
workflow_task_poller_behavior: Temporalio::Worker::PollerBehavior.simple_maximum(2)._to_bridge_options,
209+
workflow_task_poller_behavior:
210+
Temporalio::Worker::PollerBehavior::SimpleMaximum.new(2)._to_bridge_options,
210211
nonsticky_to_sticky_poll_ratio: 1.0,
211-
activity_task_poller_behavior: Temporalio::Worker::PollerBehavior.simple_maximum(1)._to_bridge_options,
212+
activity_task_poller_behavior:
213+
Temporalio::Worker::PollerBehavior::SimpleMaximum.new(1)._to_bridge_options,
212214
no_remote_activities: true,
213215
sticky_queue_schedule_to_start_timeout: 1.0,
214216
max_heartbeat_throttle_interval: 1.0,

temporalio/sig/temporalio/worker.rbs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ module Temporalio
111111
?workflow_failure_exception_types: Array[singleton(Exception)],
112112
?workflow_payload_codec_thread_pool: ThreadPool?,
113113
?unsafe_workflow_io_enabled: bool,
114-
?workflow_task_poller_behavior: SimpleMaximumPollerBehavior | AutoscalingPollerBehavior,
115-
?activity_task_poller_behavior: SimpleMaximumPollerBehavior | AutoscalingPollerBehavior,
114+
?workflow_task_poller_behavior: PollerBehavior,
115+
?activity_task_poller_behavior: PollerBehavior,
116116
?deployment_options: Worker::DeploymentOptions,
117117
?debug_mode: bool
118118
) -> void

temporalio/sig/temporalio/worker/poller_behavior.rbs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
module Temporalio
22
class Worker
33
class PollerBehavior
4-
def self.simple_maximum: (Integer) -> SimpleMaximumPollerBehavior
5-
def self.autoscaling: (?minimum: Integer, ?maximum: Integer, ?initial: Integer) -> AutoscalingPollerBehavior
64
def _to_bridge_options: -> untyped
75
end
86

9-
class SimpleMaximumPollerBehavior < PollerBehavior
7+
class SimpleMaximum < PollerBehavior
108
attr_reader maximum: Integer
119

1210
def initialize: (?maximum: Integer) -> void
1311
def _to_bridge_options: -> Internal::Bridge::Worker::PollerBehaviorSimpleMaximum
1412
end
1513

16-
class AutoscalingPollerBehavior < PollerBehavior
14+
class Autoscaling < PollerBehavior
1715
attr_reader minimum: Integer
1816
attr_reader maximum: Integer
1917
attr_reader initial: Integer

temporalio/test/worker_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,10 @@ def test_can_run_with_autoscaling_poller_behavior
199199
task_queue: "tq-#{SecureRandom.uuid}",
200200
workflows: [WaitOnSignalWorkflow],
201201
activities: [SimpleActivity],
202-
workflow_task_poller_behavior: Temporalio::Worker::PollerBehavior.autoscaling(
202+
workflow_task_poller_behavior: Temporalio::Worker::PollerBehavior::Autoscaling.new(
203203
initial: 2
204204
),
205-
activity_task_poller_behavior: Temporalio::Worker::PollerBehavior.autoscaling(
205+
activity_task_poller_behavior: Temporalio::Worker::PollerBehavior::Autoscaling.new(
206206
initial: 2
207207
)
208208
)

0 commit comments

Comments
 (0)