Skip to content

Commit 196e0ac

Browse files
authored
Priority annotations (#274)
1 parent 644bb8c commit 196e0ac

File tree

28 files changed

+249
-31
lines changed

28 files changed

+249
-31
lines changed

temporalio/lib/temporalio/activity.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
require 'temporalio/activity/context'
55
require 'temporalio/activity/definition'
66
require 'temporalio/activity/info'
7+
require 'temporalio/priority'
78

89
module Temporalio
910
# All activity related classes.

temporalio/lib/temporalio/activity/info.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Activity
1010
:heartbeat_details,
1111
:heartbeat_timeout,
1212
:local?,
13+
:priority,
1314
:schedule_to_close_timeout,
1415
:scheduled_time,
1516
:start_to_close_timeout,
@@ -38,6 +39,8 @@ module Activity
3839
# @return [Float, nil] Heartbeat timeout set by the caller.
3940
# @!attribute local?
4041
# @return [Boolean] Whether the activity is a local activity or not.
42+
# @!attribute priority
43+
# @return [Priority] The priority of this activity.
4144
# @!attribute schedule_to_close_timeout
4245
# @return [Float, nil] Schedule to close timeout set by the caller.
4346
# @!attribute scheduled_time

temporalio/lib/temporalio/client.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
require 'temporalio/converters'
2020
require 'temporalio/error'
2121
require 'temporalio/internal/client/implementation'
22+
require 'temporalio/priority'
2223
require 'temporalio/retry_policy'
2324
require 'temporalio/runtime'
2425
require 'temporalio/search_attributes'
@@ -217,6 +218,9 @@ def operator_service
217218
# with `cron_schedule`.
218219
# @param request_eager_start [Boolean] Potentially reduce the latency to start this workflow by encouraging the
219220
# server to start it on a local worker running with this same client. This is currently experimental.
221+
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
222+
# This is currently experimental.
223+
# @param priority [Priority] Priority of the workflow. This is currently experimental.
220224
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
221225
#
222226
# @return [WorkflowHandle] A workflow handle to the started workflow.
@@ -241,6 +245,7 @@ def start_workflow(
241245
start_delay: nil,
242246
request_eager_start: false,
243247
versioning_override: nil,
248+
priority: Priority.default,
244249
rpc_options: nil
245250
)
246251
@impl.start_workflow(Interceptor::StartWorkflowInput.new(
@@ -263,6 +268,7 @@ def start_workflow(
263268
request_eager_start:,
264269
headers: {},
265270
versioning_override:,
271+
priority:,
266272
rpc_options:
267273
))
268274
end
@@ -295,6 +301,9 @@ def start_workflow(
295301
# with `cron_schedule`.
296302
# @param request_eager_start [Boolean] Potentially reduce the latency to start this workflow by encouraging the
297303
# server to start it on a local worker running with this same client. This is currently experimental.
304+
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
305+
# This is currently experimental.
306+
# @param priority [Priority] Priority for the workflow. This is currently experimental.
298307
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
299308
#
300309
# @return [Object] Successful result of the workflow.
@@ -320,6 +329,7 @@ def execute_workflow(
320329
start_delay: nil,
321330
request_eager_start: false,
322331
versioning_override: nil,
332+
priority: Priority.default,
323333
follow_runs: true,
324334
rpc_options: nil
325335
)
@@ -342,6 +352,7 @@ def execute_workflow(
342352
start_delay:,
343353
request_eager_start:,
344354
versioning_override:,
355+
priority:,
345356
rpc_options:
346357
)
347358
follow_runs ? handle.result : handle

temporalio/lib/temporalio/client/interceptor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def intercept_client(next_interceptor)
3838
:request_eager_start,
3939
:headers,
4040
:versioning_override,
41+
:priority,
4142
:rpc_options
4243
)
4344

temporalio/lib/temporalio/internal/client/implementation.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def start_workflow(input)
7070
input.static_summary, input.static_details, @client.data_converter
7171
),
7272
header: ProtoUtils.headers_to_proto(input.headers, @client.data_converter),
73+
priority: input.priority._to_proto,
7374
versioning_override: input.versioning_override&._to_proto
7475
)
7576

temporalio/lib/temporalio/internal/worker/activity_worker.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ def execute_activity(task_token, defn, start)
174174
),
175175
heartbeat_timeout: Internal::ProtoUtils.duration_to_seconds(start.heartbeat_timeout),
176176
local?: start.is_local,
177+
priority: Priority._from_proto(start.priority),
177178
schedule_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.schedule_to_close_timeout),
178179
scheduled_time: Internal::ProtoUtils.timestamp_to_time(start.scheduled_time) || raise, # Never nil
179180
start_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.start_to_close_timeout),

temporalio/lib/temporalio/internal/worker/workflow_instance.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ def initialize(details)
140140
workflow_id: @init_job.parent_workflow_info.workflow_id
141141
)
142142
end,
143+
priority: Priority._from_proto(@init_job.priority),
143144
retry_policy: (RetryPolicy._from_proto(@init_job.retry_policy) if @init_job.retry_policy),
144145
root: if @init_job.root_workflow
145146
Workflow::Info::RootInfo.new(

temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ def execute_activity(
7575
cancellation:,
7676
cancellation_type:,
7777
activity_id:,
78-
disable_eager_execution:
78+
disable_eager_execution:,
79+
priority:
7980
)
8081
activity = case activity
8182
when Class
@@ -102,6 +103,7 @@ def execute_activity(
102103
cancellation_type:,
103104
activity_id:,
104105
disable_eager_execution: disable_eager_execution || @instance.disable_eager_activity_execution,
106+
priority:,
105107
headers: {}
106108
)
107109
)
@@ -249,7 +251,8 @@ def start_child_workflow(
249251
retry_policy:,
250252
cron_schedule:,
251253
memo:,
252-
search_attributes:
254+
search_attributes:,
255+
priority:
253256
)
254257
@outbound.start_child_workflow(
255258
Temporalio::Worker::Interceptor::Workflow::StartChildWorkflowInput.new(
@@ -270,6 +273,7 @@ def start_child_workflow(
270273
cron_schedule:,
271274
memo:,
272275
search_attributes:,
276+
priority:,
273277
headers: {}
274278
)
275279
)

temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ def execute_activity(input)
7373
heartbeat_timeout: ProtoUtils.seconds_to_duration(input.heartbeat_timeout),
7474
retry_policy: input.retry_policy&._to_proto,
7575
cancellation_type: input.cancellation_type,
76-
do_not_eagerly_execute: input.disable_eager_execution
76+
do_not_eagerly_execute: input.disable_eager_execution,
77+
priority: input.priority._to_proto
7778
),
7879
user_metadata: ProtoUtils.to_user_metadata(input.summary, nil, @instance.payload_converter)
7980
)
@@ -337,7 +338,8 @@ def start_child_workflow(input)
337338
headers: ProtoUtils.headers_to_proto_hash(input.headers, @instance.payload_converter),
338339
memo: ProtoUtils.memo_to_proto_hash(input.memo, @instance.payload_converter),
339340
search_attributes: input.search_attributes&._to_proto_hash,
340-
cancellation_type: input.cancellation_type
341+
cancellation_type: input.cancellation_type,
342+
priority: input.priority._to_proto
341343
),
342344
user_metadata: ProtoUtils.to_user_metadata(
343345
input.static_summary, input.static_details, @instance.payload_converter

temporalio/lib/temporalio/priority.rb

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/api'
4+
5+
module Temporalio
6+
Priority = Data.define(
7+
:priority_key
8+
)
9+
10+
# Priority contains metadata that controls relative ordering of task processing when tasks are
11+
# backlogged in a queue. Initially, Priority will be used in activity and workflow task
12+
# queues, which are typically where backlogs exist. Priority is (for now) attached to
13+
# workflows and activities. Activities and child workflows inherit Priority from the workflow
14+
# that created them, but may override fields when they are started or modified. For each field
15+
# of a Priority on an activity/workflow, not present or equal to zero/empty string means to
16+
# inherit the value from the calling workflow, or if there is no calling workflow, then use
17+
# the default (documented on the field).
18+
#
19+
# The overall semantics of Priority are:
20+
# 1. First, consider "priority_key": lower number goes first.
21+
# (more will be added here later).
22+
#
23+
# @!attribute priority_key
24+
# @return [Integer, nil] The priority key, which is a positive integer from 1 to n, where
25+
# smaller integers correspond to higher priorities (tasks run sooner). In general, tasks in a
26+
# queue should be processed in close to priority order, although small deviations are possible.
27+
# The maximum priority value (minimum priority) is determined by server configuration, and
28+
# defaults to 5.
29+
#
30+
# The default priority is (min+max)/2. With the default max of 5 and min of 1, that comes
31+
# out to 3.
32+
class Priority
33+
# @!visibility private
34+
def self._from_proto(priority)
35+
return default if priority.nil?
36+
37+
new(priority_key: priority.priority_key.zero? ? nil : priority.priority_key)
38+
end
39+
40+
# The default priority instance.
41+
#
42+
# @return [Priority] The default priority
43+
def self.default
44+
@default ||= new(priority_key: nil)
45+
end
46+
47+
# @!visibility private
48+
def _to_proto
49+
return nil if priority_key.nil?
50+
51+
Temporalio::Api::Common::V1::Priority.new(priority_key: priority_key || 0)
52+
end
53+
54+
# @return [Boolean] True if this priority is empty/default
55+
def empty?
56+
priority_key.nil?
57+
end
58+
end
59+
end

temporalio/lib/temporalio/testing/activity_environment.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def self.default_info
2424
heartbeat_details: [],
2525
heartbeat_timeout: nil,
2626
local?: false,
27+
priority: Temporalio::Priority.default,
2728
schedule_to_close_timeout: 1.0,
2829
scheduled_time: Time.at(0),
2930
start_to_close_timeout: 1.0,

temporalio/lib/temporalio/worker/interceptor.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ def handle_update(input)
216216
:cancellation_type,
217217
:activity_id,
218218
:disable_eager_execution,
219-
:headers
219+
:headers,
220+
:priority
220221
)
221222

222223
# Input for {Outbound.execute_local_activity}.
@@ -284,7 +285,8 @@ def handle_update(input)
284285
:cron_schedule,
285286
:memo,
286287
:search_attributes,
287-
:headers
288+
:headers,
289+
:priority
288290
)
289291

290292
# Outbound interceptor for intercepting outbound workflow calls. This should be extended by users needing to

temporalio/lib/temporalio/worker/workflow_replayer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def initialize(
205205
)._to_bridge_options,
206206
identity_override: options.identity,
207207
max_cached_workflows: 2,
208-
max_concurrent_workflow_task_polls: 1,
208+
max_concurrent_workflow_task_polls: 2,
209209
nonsticky_to_sticky_poll_ratio: 1.0,
210210
max_concurrent_activity_task_polls: 1,
211211
no_remote_activities: true,

temporalio/lib/temporalio/workflow.rb

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
require 'random/formatter'
44
require 'temporalio/error'
5+
require 'temporalio/priority'
56
require 'temporalio/workflow/activity_cancellation_type'
67
require 'temporalio/workflow/child_workflow_cancellation_type'
78
require 'temporalio/workflow/child_workflow_handle'
@@ -130,6 +131,7 @@ def self.deprecate_patch(patch_id)
130131
# optimization on some servers that sends activities back to the same worker as the calling workflow if they can
131132
# run there. If `false` (the default), eager execution may still be disabled at the worker level or may not be
132133
# requested due to lack of available slots.
134+
# @param priority [Priority] Priority of the activity. This is currently experimental.
133135
#
134136
# @return [Object] Result of the activity.
135137
# @raise [Error::ActivityError] Activity failed (and retry was disabled or exhausted).
@@ -148,12 +150,14 @@ def self.execute_activity(
148150
cancellation: Workflow.cancellation,
149151
cancellation_type: ActivityCancellationType::TRY_CANCEL,
150152
activity_id: nil,
151-
disable_eager_execution: false
153+
disable_eager_execution: false,
154+
priority: Priority.default
152155
)
153156
_current.execute_activity(
154157
activity, *args,
155158
task_queue:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:,
156-
heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:
159+
heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:,
160+
priority:
157161
)
158162
end
159163

@@ -175,13 +179,14 @@ def self.execute_child_workflow(
175179
retry_policy: nil,
176180
cron_schedule: nil,
177181
memo: nil,
178-
search_attributes: nil
182+
search_attributes: nil,
183+
priority: Priority.default
179184
)
180185
start_child_workflow(
181186
workflow, *args,
182187
id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:,
183188
parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:,
184-
retry_policy:, cron_schedule:, memo:, search_attributes:
189+
retry_policy:, cron_schedule:, memo:, search_attributes:, priority:
185190
).result
186191
end
187192

@@ -372,6 +377,7 @@ def self.sleep(duration, summary: nil, cancellation: Workflow.cancellation)
372377
# @param cron_schedule [String, nil] Cron schedule. Users should use schedules instead of this.
373378
# @param memo [Hash{String, Symbol => Object}, nil] Memo for the workflow.
374379
# @param search_attributes [SearchAttributes, nil] Search attributes for the workflow.
380+
# @param priority [Priority] Priority of the workflow. This is currently experimental.
375381
#
376382
# @return [ChildWorkflowHandle] Workflow handle to the started workflow.
377383
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists for the ID.
@@ -393,13 +399,14 @@ def self.start_child_workflow(
393399
retry_policy: nil,
394400
cron_schedule: nil,
395401
memo: nil,
396-
search_attributes: nil
402+
search_attributes: nil,
403+
priority: Priority.default
397404
)
398405
_current.start_child_workflow(
399406
workflow, *args,
400407
id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:,
401408
parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:,
402-
retry_policy:, cron_schedule:, memo:, search_attributes:
409+
retry_policy:, cron_schedule:, memo:, search_attributes:, priority:
403410
)
404411
end
405412

temporalio/lib/temporalio/workflow/info.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module Workflow
1212
:last_result,
1313
:namespace,
1414
:parent,
15+
:priority,
1516
:retry_policy,
1617
:root,
1718
:run_id,
@@ -44,6 +45,8 @@ module Workflow
4445
# @return [String] Namespace for the workflow.
4546
# @!attribute parent
4647
# @return [ParentInfo, nil] Parent information for the workflow if this is a child.
48+
# @!attribute priority
49+
# @return [Priority] The priority of this workflow.
4750
# @!attribute retry_policy
4851
# @return [RetryPolicy, nil] Retry policy for the workflow.
4952
# @!attribute root

temporalio/sig/temporalio/activity/info.rbs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ module Temporalio
88
attr_reader heartbeat_details: Array[Object?]
99
attr_reader heartbeat_timeout: Float?
1010
attr_reader local?: bool
11+
attr_reader priority: Temporalio::Priority
1112
attr_reader schedule_to_close_timeout: Float?
1213
attr_reader scheduled_time: Time
1314
attr_reader start_to_close_timeout: Float?
@@ -27,6 +28,7 @@ module Temporalio
2728
heartbeat_details: Array[Object?],
2829
heartbeat_timeout: Float?,
2930
local?: bool,
31+
priority: Temporalio::Priority?,
3032
schedule_to_close_timeout: Float?,
3133
scheduled_time: Time,
3234
start_to_close_timeout: Float?,
@@ -42,4 +44,4 @@ module Temporalio
4244
def with: (**untyped) -> Info
4345
end
4446
end
45-
end
47+
end

0 commit comments

Comments
 (0)