Skip to content

Commit 127835e

Browse files
authored
Expose poller autoscaling options (#830)
1 parent 4933dc5 commit 127835e

File tree

7 files changed

+222
-24
lines changed

7 files changed

+222
-24
lines changed

Diff for: temporalio/bridge/src/worker.rs

+42-11
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ use temporal_sdk_core::api::errors::PollError;
1515
use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput};
1616
use temporal_sdk_core_api::errors::WorkflowErrorType;
1717
use temporal_sdk_core_api::worker::{
18-
PollerBehavior, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext,
19-
SlotReleaseContext, SlotReservationContext, SlotSupplier as SlotSupplierTrait,
20-
SlotSupplierPermit,
18+
SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext,
19+
SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit,
2120
};
2221
use temporal_sdk_core_api::Worker;
2322
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
@@ -49,9 +48,9 @@ pub struct WorkerConfig {
4948
identity_override: Option<String>,
5049
max_cached_workflows: usize,
5150
tuner: TunerHolder,
52-
max_concurrent_workflow_task_polls: usize,
51+
workflow_task_poller_behavior: PollerBehavior,
5352
nonsticky_to_sticky_poll_ratio: f32,
54-
max_concurrent_activity_task_polls: usize,
53+
activity_task_poller_behavior: PollerBehavior,
5554
no_remote_activities: bool,
5655
sticky_queue_schedule_to_start_timeout_millis: u64,
5756
max_heartbeat_throttle_interval_millis: u64,
@@ -63,6 +62,42 @@ pub struct WorkerConfig {
6362
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
6463
}
6564

65+
#[derive(FromPyObject)]
66+
pub struct PollerBehaviorSimpleMaximum {
67+
pub simple_maximum: usize,
68+
}
69+
70+
#[derive(FromPyObject)]
71+
pub struct PollerBehaviorAutoscaling {
72+
pub minimum: usize,
73+
pub maximum: usize,
74+
pub initial: usize,
75+
}
76+
77+
/// Recreates [temporal_sdk_core_api::worker::PollerBehavior]
78+
#[derive(FromPyObject)]
79+
pub enum PollerBehavior {
80+
SimpleMaximum(PollerBehaviorSimpleMaximum),
81+
Autoscaling(PollerBehaviorAutoscaling),
82+
}
83+
84+
impl From<PollerBehavior> for temporal_sdk_core_api::worker::PollerBehavior {
85+
fn from(value: PollerBehavior) -> Self {
86+
match value {
87+
PollerBehavior::SimpleMaximum(simple) => {
88+
temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum)
89+
}
90+
PollerBehavior::Autoscaling(auto) => {
91+
temporal_sdk_core_api::worker::PollerBehavior::Autoscaling {
92+
minimum: auto.minimum,
93+
maximum: auto.maximum,
94+
initial: auto.initial,
95+
}
96+
}
97+
}
98+
}
99+
}
100+
66101
/// Recreates [temporal_sdk_core_api::worker::WorkerVersioningStrategy]
67102
#[derive(FromPyObject)]
68103
pub enum WorkerVersioningStrategy {
@@ -626,14 +661,10 @@ fn convert_worker_config(
626661
.versioning_strategy(converted_versioning_strategy)
627662
.client_identity_override(conf.identity_override)
628663
.max_cached_workflows(conf.max_cached_workflows)
629-
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(
630-
conf.max_concurrent_workflow_task_polls,
631-
))
664+
.workflow_task_poller_behavior(conf.workflow_task_poller_behavior)
632665
.tuner(Arc::new(converted_tuner))
633666
.nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio)
634-
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(
635-
conf.max_concurrent_activity_task_polls,
636-
))
667+
.activity_task_poller_behavior(conf.activity_task_poller_behavior)
637668
.no_remote_activities(conf.no_remote_activities)
638669
.sticky_queue_schedule_to_start_timeout(Duration::from_millis(
639670
conf.sticky_queue_schedule_to_start_timeout_millis,

Diff for: temporalio/bridge/worker.py

+24-2
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ class WorkerConfig:
4848
identity_override: Optional[str]
4949
max_cached_workflows: int
5050
tuner: TunerHolder
51-
max_concurrent_workflow_task_polls: int
51+
workflow_task_poller_behavior: PollerBehavior
5252
nonsticky_to_sticky_poll_ratio: float
53-
max_concurrent_activity_task_polls: int
53+
activity_task_poller_behavior: PollerBehavior
5454
no_remote_activities: bool
5555
sticky_queue_schedule_to_start_timeout_millis: int
5656
max_heartbeat_throttle_interval_millis: int
@@ -62,6 +62,28 @@ class WorkerConfig:
6262
nondeterminism_as_workflow_fail_for_types: Set[str]
6363

6464

65+
@dataclass
66+
class PollerBehaviorSimpleMaximum:
67+
"""Python representation of the Rust struct for simple poller behavior."""
68+
69+
simple_maximum: int
70+
71+
72+
@dataclass
73+
class PollerBehaviorAutoscaling:
74+
"""Python representation of the Rust struct for autoscaling poller behavior."""
75+
76+
minimum: int
77+
maximum: int
78+
initial: int
79+
80+
81+
PollerBehavior: TypeAlias = Union[
82+
PollerBehaviorSimpleMaximum,
83+
PollerBehaviorAutoscaling,
84+
]
85+
86+
6587
@dataclass
6688
class WorkerDeploymentVersion:
6789
"""Python representation of the Rust struct for configuring a worker deployment version."""

Diff for: temporalio/worker/__init__.py

+6
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
WorkflowSlotInfo,
4444
)
4545
from ._worker import (
46+
PollerBehavior,
47+
PollerBehaviorAutoscaling,
48+
PollerBehaviorSimpleMaximum,
4649
Worker,
4750
WorkerConfig,
4851
WorkerDeploymentConfig,
@@ -65,6 +68,9 @@
6568
"ReplayerConfig",
6669
"WorkflowReplayResult",
6770
"WorkflowReplayResults",
71+
"PollerBehavior",
72+
"PollerBehaviorSimpleMaximum",
73+
"PollerBehaviorAutoscaling",
6874
# Interceptor base classes
6975
"Interceptor",
7076
"ActivityInboundInterceptor",

Diff for: temporalio/worker/_replayer.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,7 @@ def on_eviction_hook(
242242
1
243243
),
244244
),
245-
max_concurrent_workflow_task_polls=1,
246245
nonsticky_to_sticky_poll_ratio=1,
247-
max_concurrent_activity_task_polls=1,
248246
no_remote_activities=True,
249247
sticky_queue_schedule_to_start_timeout_millis=1000,
250248
max_heartbeat_throttle_interval_millis=1000,
@@ -255,6 +253,12 @@ def on_eviction_hook(
255253
versioning_strategy=temporalio.bridge.worker.WorkerVersioningStrategyNone(
256254
build_id=self._config["build_id"] or load_default_build_id(),
257255
),
256+
workflow_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
257+
1
258+
),
259+
activity_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
260+
1
261+
),
258262
),
259263
)
260264
# Start worker

Diff for: temporalio/worker/_worker.py

+82-7
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
Optional,
1919
Sequence,
2020
Type,
21+
Union,
2122
cast,
2223
)
2324

24-
from typing_extensions import TypedDict
25+
from typing_extensions import TypeAlias, TypedDict
2526

2627
import temporalio.activity
2728
import temporalio.api.common.v1
@@ -48,6 +49,48 @@
4849
logger = logging.getLogger(__name__)
4950

5051

52+
@dataclass(frozen=True)
53+
class PollerBehaviorSimpleMaximum:
54+
"""A poller behavior that will attempt to poll as long as a slot is available, up to the
55+
provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
56+
"""
57+
58+
maximum: int = 5
59+
60+
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
61+
return temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
62+
simple_maximum=self.maximum
63+
)
64+
65+
66+
@dataclass(frozen=True)
67+
class PollerBehaviorAutoscaling:
68+
"""A poller behavior that will automatically scale the number of pollers based on feedback
69+
from the server. A slot must be available before beginning polling.
70+
"""
71+
72+
minimum: int = 1
73+
"""At least this many poll calls will always be attempted (assuming slots are available)."""
74+
maximum: int = 100
75+
"""At most this many poll calls will ever be open at once. Must be >= `minimum`."""
76+
initial: int = 5
77+
"""This many polls will be attempted initially before scaling kicks in. Must be between
78+
`minimum` and `maximum`."""
79+
80+
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
81+
return temporalio.bridge.worker.PollerBehaviorAutoscaling(
82+
minimum=self.minimum,
83+
maximum=self.maximum,
84+
initial=self.initial,
85+
)
86+
87+
88+
PollerBehavior: TypeAlias = Union[
89+
PollerBehaviorSimpleMaximum,
90+
PollerBehaviorAutoscaling,
91+
]
92+
93+
5194
class Worker:
5295
"""Worker to process workflows and/or activities.
5396
@@ -76,9 +119,9 @@ def __init__(
76119
max_concurrent_activities: Optional[int] = None,
77120
max_concurrent_local_activities: Optional[int] = None,
78121
tuner: Optional[WorkerTuner] = None,
79-
max_concurrent_workflow_task_polls: int = 5,
122+
max_concurrent_workflow_task_polls: Optional[int] = None,
80123
nonsticky_to_sticky_poll_ratio: float = 0.2,
81-
max_concurrent_activity_task_polls: int = 5,
124+
max_concurrent_activity_task_polls: Optional[int] = None,
82125
no_remote_activities: bool = False,
83126
sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),
84127
max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),
@@ -94,6 +137,12 @@ def __init__(
94137
use_worker_versioning: bool = False,
95138
disable_safe_workflow_eviction: bool = False,
96139
deployment_config: Optional[WorkerDeploymentConfig] = None,
140+
workflow_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(
141+
maximum=5
142+
),
143+
activity_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(
144+
maximum=5
145+
),
97146
) -> None:
98147
"""Create a worker to process workflows and/or activities.
99148
@@ -152,10 +201,17 @@ def __init__(
152201
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and
153202
``max_concurrent_local_activities`` arguments.
154203
204+
Defaults to fixed-size 100 slots for each slot kind if unset and none of the
205+
max_* arguments are provided.
206+
155207
WARNING: This argument is experimental
156208
max_concurrent_workflow_task_polls: Maximum number of concurrent
157209
poll workflow task requests we will perform at a time on this
158210
worker's task queue.
211+
212+
If set, will override any value passed to ``workflow_task_poller_behavior``.
213+
214+
WARNING: Deprecated, use ``workflow_task_poller_behavior`` instead
159215
nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls *
160216
this number = the number of max pollers that will be allowed for
161217
the nonsticky queue when sticky tasks are enabled. If both
@@ -166,6 +222,10 @@ def __init__(
166222
max_concurrent_activity_task_polls: Maximum number of concurrent
167223
poll activity task requests we will perform at a time on this
168224
worker's task queue.
225+
226+
If set, will override any value passed to ``activity_task_poller_behavior``.
227+
228+
WARNING: Deprecated, use ``activity_task_poller_behavior`` instead
169229
no_remote_activities: If true, this worker will only handle workflow
170230
tasks and local activities, it will not poll for activity tasks.
171231
sticky_queue_schedule_to_start_timeout: How long a workflow task is
@@ -231,6 +291,10 @@ def __init__(
231291
deployment_config: Deployment config for the worker. Exclusive with `build_id` and
232292
`use_worker_versioning`.
233293
WARNING: This is an experimental feature and may change in the future.
294+
workflow_task_poller_behavior: Specify the behavior of workflow task polling.
295+
Defaults to a 5-poller maximum.
296+
activity_task_poller_behavior: Specify the behavior of activity task polling.
297+
Defaults to a 5-poller maximum.
234298
"""
235299
if not activities and not workflows:
236300
raise ValueError("At least one activity or workflow must be specified")
@@ -393,6 +457,15 @@ def __init__(
393457
build_id=build_id
394458
)
395459

460+
if max_concurrent_workflow_task_polls:
461+
workflow_task_poller_behavior = PollerBehaviorSimpleMaximum(
462+
maximum=max_concurrent_workflow_task_polls
463+
)
464+
if max_concurrent_activity_task_polls:
465+
activity_task_poller_behavior = PollerBehaviorSimpleMaximum(
466+
maximum=max_concurrent_activity_task_polls
467+
)
468+
396469
# Create bridge worker last. We have empirically observed that if it is
397470
# created before an error is raised from the activity worker
398471
# constructor, a deadlock/hang will occur presumably while trying to
@@ -408,9 +481,7 @@ def __init__(
408481
identity_override=identity,
409482
max_cached_workflows=max_cached_workflows,
410483
tuner=bridge_tuner,
411-
max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls,
412484
nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio,
413-
max_concurrent_activity_task_polls=max_concurrent_activity_task_polls,
414485
# We have to disable remote activities if a user asks _or_ if we
415486
# are not running an activity worker at all. Otherwise shutdown
416487
# will not proceed properly.
@@ -440,6 +511,8 @@ def __init__(
440511
else set()
441512
),
442513
versioning_strategy=versioning_strategy,
514+
workflow_task_poller_behavior=workflow_task_poller_behavior._to_bridge(),
515+
activity_task_poller_behavior=activity_task_poller_behavior._to_bridge(),
443516
),
444517
)
445518

@@ -696,9 +769,9 @@ class WorkerConfig(TypedDict, total=False):
696769
max_concurrent_activities: Optional[int]
697770
max_concurrent_local_activities: Optional[int]
698771
tuner: Optional[WorkerTuner]
699-
max_concurrent_workflow_task_polls: int
772+
max_concurrent_workflow_task_polls: Optional[int]
700773
nonsticky_to_sticky_poll_ratio: float
701-
max_concurrent_activity_task_polls: int
774+
max_concurrent_activity_task_polls: Optional[int]
702775
no_remote_activities: bool
703776
sticky_queue_schedule_to_start_timeout: timedelta
704777
max_heartbeat_throttle_interval: timedelta
@@ -714,6 +787,8 @@ class WorkerConfig(TypedDict, total=False):
714787
use_worker_versioning: bool
715788
disable_safe_workflow_eviction: bool
716789
deployment_config: Optional[WorkerDeploymentConfig]
790+
workflow_task_poller_behavior: PollerBehavior
791+
activity_task_poller_behavior: PollerBehavior
717792

718793

719794
@dataclass

0 commit comments

Comments
 (0)