Skip to content

Commit 6071f34

Browse files
committed
Add priority annotations
1 parent 2d8d800 commit 6071f34

26 files changed

Lines changed: 250 additions & 29 deletions

File tree

temporalio/lib/temporalio/activity/info.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Activity
1010
:heartbeat_details,
1111
:heartbeat_timeout,
1212
:local?,
13+
:priority,
1314
:schedule_to_close_timeout,
1415
:scheduled_time,
1516
:start_to_close_timeout,
@@ -38,6 +39,8 @@ module Activity
3839
# @return [Float, nil] Heartbeat timeout set by the caller.
3940
# @!attribute local?
4041
# @return [Boolean] Whether the activity is a local activity or not.
42+
# @!attribute priority
43+
# @return [Priority, nil] The priority of this activity.
4144
# @!attribute schedule_to_close_timeout
4245
# @return [Float, nil] Schedule to close timeout set by the caller.
4346
# @!attribute scheduled_time

temporalio/lib/temporalio/client.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
require 'temporalio/converters'
2020
require 'temporalio/error'
2121
require 'temporalio/internal/client/implementation'
22+
require 'temporalio/priority'
2223
require 'temporalio/retry_policy'
2324
require 'temporalio/runtime'
2425
require 'temporalio/search_attributes'
@@ -217,6 +218,9 @@ def operator_service
217218
# with `cron_schedule`.
218219
# @param request_eager_start [Boolean] Potentially reduce the latency to start this workflow by encouraging the
219220
# server to start it on a local worker running with this same client. This is currently experimental.
221+
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
222+
# This is currently experimental.
223+
# @param priority [Priority] Priority of the workflow. This is currently experimental.
220224
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
221225
#
222226
# @return [WorkflowHandle] A workflow handle to the started workflow.
@@ -241,6 +245,7 @@ def start_workflow(
241245
start_delay: nil,
242246
request_eager_start: false,
243247
versioning_override: nil,
248+
priority: Priority.default,
244249
rpc_options: nil
245250
)
246251
@impl.start_workflow(Interceptor::StartWorkflowInput.new(
@@ -263,6 +268,7 @@ def start_workflow(
263268
request_eager_start:,
264269
headers: {},
265270
versioning_override:,
271+
priority:,
266272
rpc_options:
267273
))
268274
end
@@ -295,6 +301,9 @@ def start_workflow(
295301
# with `cron_schedule`.
296302
# @param request_eager_start [Boolean] Potentially reduce the latency to start this workflow by encouraging the
297303
# server to start it on a local worker running with this same client. This is currently experimental.
304+
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
305+
# This is currently experimental.
306+
# @param priority [Priority] Priority for the workflow. This is currently experimental.
298307
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
299308
#
300309
# @return [Object] Successful result of the workflow.
@@ -320,6 +329,7 @@ def execute_workflow(
320329
start_delay: nil,
321330
request_eager_start: false,
322331
versioning_override: nil,
332+
priority: Priority.default,
323333
follow_runs: true,
324334
rpc_options: nil
325335
)
@@ -342,6 +352,7 @@ def execute_workflow(
342352
start_delay:,
343353
request_eager_start:,
344354
versioning_override:,
355+
priority:,
345356
rpc_options:
346357
)
347358
follow_runs ? handle.result : handle

temporalio/lib/temporalio/client/interceptor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def intercept_client(next_interceptor)
3737
:start_delay,
3838
:request_eager_start,
3939
:headers,
40+
:priority,
4041
:versioning_override,
4142
:rpc_options
4243
)

temporalio/lib/temporalio/internal/client/implementation.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def start_workflow(input)
7070
input.static_summary, input.static_details, @client.data_converter
7171
),
7272
header: ProtoUtils.headers_to_proto(input.headers, @client.data_converter),
73+
priority: input.priority&._to_proto,
7374
versioning_override: input.versioning_override&._to_proto
7475
)
7576

temporalio/lib/temporalio/internal/worker/activity_worker.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ def execute_activity(task_token, defn, start)
174174
),
175175
heartbeat_timeout: Internal::ProtoUtils.duration_to_seconds(start.heartbeat_timeout),
176176
local?: start.is_local,
177+
priority: Priority._from_proto(start.priority),
177178
schedule_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.schedule_to_close_timeout),
178179
scheduled_time: Internal::ProtoUtils.timestamp_to_time(start.scheduled_time) || raise, # Never nil
179180
start_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.start_to_close_timeout),

temporalio/lib/temporalio/internal/worker/workflow_instance.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ def initialize(details)
140140
workflow_id: @init_job.parent_workflow_info.workflow_id
141141
)
142142
end,
143+
priority: Priority._from_proto(@init_job.priority),
143144
retry_policy: (RetryPolicy._from_proto(@init_job.retry_policy) if @init_job.retry_policy),
144145
root: if @init_job.root_workflow
145146
Workflow::Info::RootInfo.new(

temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ def execute_activity(
7575
cancellation:,
7676
cancellation_type:,
7777
activity_id:,
78-
disable_eager_execution:
78+
disable_eager_execution:,
79+
priority:
7980
)
8081
activity = case activity
8182
when Class
@@ -102,6 +103,7 @@ def execute_activity(
102103
cancellation_type:,
103104
activity_id:,
104105
disable_eager_execution: disable_eager_execution || @instance.disable_eager_activity_execution,
106+
priority:,
105107
headers: {}
106108
)
107109
)
@@ -249,7 +251,8 @@ def start_child_workflow(
249251
retry_policy:,
250252
cron_schedule:,
251253
memo:,
252-
search_attributes:
254+
search_attributes:,
255+
priority:
253256
)
254257
@outbound.start_child_workflow(
255258
Temporalio::Worker::Interceptor::Workflow::StartChildWorkflowInput.new(
@@ -270,6 +273,7 @@ def start_child_workflow(
270273
cron_schedule:,
271274
memo:,
272275
search_attributes:,
276+
priority:,
273277
headers: {}
274278
)
275279
)

temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ def execute_activity(input)
7373
heartbeat_timeout: ProtoUtils.seconds_to_duration(input.heartbeat_timeout),
7474
retry_policy: input.retry_policy&._to_proto,
7575
cancellation_type: input.cancellation_type,
76-
do_not_eagerly_execute: input.disable_eager_execution
76+
do_not_eagerly_execute: input.disable_eager_execution,
77+
priority: input.priority&._to_proto
7778
),
7879
user_metadata: ProtoUtils.to_user_metadata(input.summary, nil, @instance.payload_converter)
7980
)
@@ -337,7 +338,8 @@ def start_child_workflow(input)
337338
headers: ProtoUtils.headers_to_proto_hash(input.headers, @instance.payload_converter),
338339
memo: ProtoUtils.memo_to_proto_hash(input.memo, @instance.payload_converter),
339340
search_attributes: input.search_attributes&._to_proto_hash,
340-
cancellation_type: input.cancellation_type
341+
cancellation_type: input.cancellation_type,
342+
priority: input.priority&._to_proto
341343
),
342344
user_metadata: ProtoUtils.to_user_metadata(
343345
input.static_summary, input.static_details, @instance.payload_converter
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/api'
4+
5+
module Temporalio
6+
# Priority contains metadata that controls relative ordering of task processing when tasks are
7+
# backlogged in a queue. Initially, Priority will be used in activity and workflow task
8+
# queues, which are typically where backlogs exist. Priority is (for now) attached to
9+
# workflows and activities. Activities and child workflows inherit Priority from the workflow
10+
# that created them, but may override fields when they are started or modified. For each field
11+
# of a Priority on an activity/workflow, not present or equal to zero/empty string means to
12+
# inherit the value from the calling workflow, or if there is no calling workflow, then use
13+
# the default (documented on the field).
14+
#
15+
# The overall semantics of Priority are:
16+
# 1. First, consider "priority_key": lower number goes first.
17+
# (more will be added here later).
18+
class Priority
19+
# The priority key, which is a positive integer from 1 to n, where smaller integers
20+
# correspond to higher priorities (tasks run sooner). In general, tasks in a queue should
21+
# be processed in close to priority order, although small deviations are possible. The
22+
# maximum priority value (minimum priority) is determined by server configuration, and
23+
# defaults to 5.
24+
#
25+
# The default priority is (min+max)/2. With the default max of 5 and min of 1, that comes
26+
# out to 3.
27+
#
28+
# @return [Integer, nil] The priority key
29+
attr_reader :priority_key
30+
31+
# @!visibility private
32+
def self._from_proto(priority)
33+
return default if priority.nil?
34+
35+
new(priority_key: priority.priority_key.zero? ? nil : priority.priority_key)
36+
end
37+
38+
# The default priority instance.
39+
#
40+
# @return [Priority] The default priority
41+
def self.default
42+
@default ||= new
43+
end
44+
45+
# Create a new Priority instance
46+
#
47+
# @param priority_key [Integer, nil] The priority key
48+
def initialize(priority_key: nil)
49+
@priority_key = priority_key
50+
end
51+
52+
# @!visibility private
53+
def _to_proto
54+
return nil if priority_key.nil?
55+
56+
Temporalio::Api::Common::V1::Priority.new(priority_key: priority_key || 0)
57+
end
58+
59+
# @return [Boolean] True if this priority is empty/default
60+
def empty?
61+
priority_key.nil?
62+
end
63+
end
64+
end

temporalio/lib/temporalio/testing/activity_environment.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def self.default_info
2424
heartbeat_details: [],
2525
heartbeat_timeout: nil,
2626
local?: false,
27+
priority: Temporalio::Priority.default,
2728
schedule_to_close_timeout: 1.0,
2829
scheduled_time: Time.at(0),
2930
start_to_close_timeout: 1.0,

0 commit comments

Comments
 (0)