Skip to content

Priority annotations #274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'temporalio/activity/context'
require 'temporalio/activity/definition'
require 'temporalio/activity/info'
require 'temporalio/priority'

module Temporalio
# All activity related classes.
Expand Down
3 changes: 3 additions & 0 deletions temporalio/lib/temporalio/activity/info.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Activity
:heartbeat_details,
:heartbeat_timeout,
:local?,
:priority,
:schedule_to_close_timeout,
:scheduled_time,
:start_to_close_timeout,
Expand Down Expand Up @@ -38,6 +39,8 @@ module Activity
# @return [Float, nil] Heartbeat timeout set by the caller.
# @!attribute local?
# @return [Boolean] Whether the activity is a local activity or not.
# @!attribute priority
# @return [Priority] The priority of this activity.
# @!attribute schedule_to_close_timeout
# @return [Float, nil] Schedule to close timeout set by the caller.
# @!attribute scheduled_time
Expand Down
11 changes: 11 additions & 0 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require 'temporalio/converters'
require 'temporalio/error'
require 'temporalio/internal/client/implementation'
require 'temporalio/priority'
require 'temporalio/retry_policy'
require 'temporalio/runtime'
require 'temporalio/search_attributes'
Expand Down Expand Up @@ -217,6 +218,9 @@ def operator_service
# with `cron_schedule`.
# @param request_eager_start [Boolean] Potentially reduce the latency to start this workflow by encouraging the
# server to start it on a local worker running with this same client. This is currently experimental.
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
# This is currently experimental.
# @param priority [Priority] Priority of the workflow. This is currently experimental.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [WorkflowHandle] A workflow handle to the started workflow.
Expand All @@ -241,6 +245,7 @@ def start_workflow(
start_delay: nil,
request_eager_start: false,
versioning_override: nil,
priority: Priority.default,
rpc_options: nil
)
@impl.start_workflow(Interceptor::StartWorkflowInput.new(
Expand All @@ -263,6 +268,7 @@ def start_workflow(
request_eager_start:,
headers: {},
versioning_override:,
priority:,
rpc_options:
))
end
Expand Down Expand Up @@ -295,6 +301,9 @@ def start_workflow(
# with `cron_schedule`.
# @param request_eager_start [Boolean] Potentially reduce the latency to start this workflow by encouraging the
# server to start it on a local worker running with this same client. This is currently experimental.
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
# This is currently experimental.
# @param priority [Priority] Priority for the workflow. This is currently experimental.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [Object] Successful result of the workflow.
Expand All @@ -320,6 +329,7 @@ def execute_workflow(
start_delay: nil,
request_eager_start: false,
versioning_override: nil,
priority: Priority.default,
follow_runs: true,
rpc_options: nil
)
Expand All @@ -342,6 +352,7 @@ def execute_workflow(
start_delay:,
request_eager_start:,
versioning_override:,
priority:,
rpc_options:
)
follow_runs ? handle.result : handle
Expand Down
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def intercept_client(next_interceptor)
:request_eager_start,
:headers,
:versioning_override,
:priority,
:rpc_options
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def start_workflow(input)
input.static_summary, input.static_details, @client.data_converter
),
header: ProtoUtils.headers_to_proto(input.headers, @client.data_converter),
priority: input.priority._to_proto,
versioning_override: input.versioning_override&._to_proto
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def execute_activity(task_token, defn, start)
),
heartbeat_timeout: Internal::ProtoUtils.duration_to_seconds(start.heartbeat_timeout),
local?: start.is_local,
priority: Priority._from_proto(start.priority),
schedule_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.schedule_to_close_timeout),
scheduled_time: Internal::ProtoUtils.timestamp_to_time(start.scheduled_time) || raise, # Never nil
start_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.start_to_close_timeout),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def initialize(details)
workflow_id: @init_job.parent_workflow_info.workflow_id
)
end,
priority: Priority._from_proto(@init_job.priority),
retry_policy: (RetryPolicy._from_proto(@init_job.retry_policy) if @init_job.retry_policy),
root: if @init_job.root_workflow
Workflow::Info::RootInfo.new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def execute_activity(
cancellation:,
cancellation_type:,
activity_id:,
disable_eager_execution:
disable_eager_execution:,
priority:
)
activity = case activity
when Class
Expand All @@ -102,6 +103,7 @@ def execute_activity(
cancellation_type:,
activity_id:,
disable_eager_execution: disable_eager_execution || @instance.disable_eager_activity_execution,
priority:,
headers: {}
)
)
Expand Down Expand Up @@ -249,7 +251,8 @@ def start_child_workflow(
retry_policy:,
cron_schedule:,
memo:,
search_attributes:
search_attributes:,
priority:
)
@outbound.start_child_workflow(
Temporalio::Worker::Interceptor::Workflow::StartChildWorkflowInput.new(
Expand All @@ -270,6 +273,7 @@ def start_child_workflow(
cron_schedule:,
memo:,
search_attributes:,
priority:,
headers: {}
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def execute_activity(input)
heartbeat_timeout: ProtoUtils.seconds_to_duration(input.heartbeat_timeout),
retry_policy: input.retry_policy&._to_proto,
cancellation_type: input.cancellation_type,
do_not_eagerly_execute: input.disable_eager_execution
do_not_eagerly_execute: input.disable_eager_execution,
priority: input.priority._to_proto
),
user_metadata: ProtoUtils.to_user_metadata(input.summary, nil, @instance.payload_converter)
)
Expand Down Expand Up @@ -337,7 +338,8 @@ def start_child_workflow(input)
headers: ProtoUtils.headers_to_proto_hash(input.headers, @instance.payload_converter),
memo: ProtoUtils.memo_to_proto_hash(input.memo, @instance.payload_converter),
search_attributes: input.search_attributes&._to_proto_hash,
cancellation_type: input.cancellation_type
cancellation_type: input.cancellation_type,
priority: input.priority._to_proto
),
user_metadata: ProtoUtils.to_user_metadata(
input.static_summary, input.static_details, @instance.payload_converter
Expand Down
59 changes: 59 additions & 0 deletions temporalio/lib/temporalio/priority.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# frozen_string_literal: true

require 'temporalio/api'

module Temporalio
Priority = Data.define(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to default a field for this you can still leave the initialize the way you had it in the class and just call super, e.g.

def initialize(priority_key: nil)
  super
end

But requiring it is fine too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, honestly requiring it makes more sense

:priority_key
)

# Priority contains metadata that controls relative ordering of task processing when tasks are
# backlogged in a queue. Initially, Priority will be used in activity and workflow task
# queues, which are typically where backlogs exist. Priority is (for now) attached to
# workflows and activities. Activities and child workflows inherit Priority from the workflow
# that created them, but may override fields when they are started or modified. For each field
# of a Priority on an activity/workflow, not present or equal to zero/empty string means to
# inherit the value from the calling workflow, or if there is no calling workflow, then use
# the default (documented on the field).
#
# The overall semantics of Priority are:
# 1. First, consider "priority_key": lower number goes first.
# (more will be added here later).
#
# @!attribute priority_key
# @return [Integer, nil] The priority key, which is a positive integer from 1 to n, where
# smaller integers correspond to higher priorities (tasks run sooner). In general, tasks in a
# queue should be processed in close to priority order, although small deviations are possible.
# The maximum priority value (minimum priority) is determined by server configuration, and
# defaults to 5.
#
# The default priority is (min+max)/2. With the default max of 5 and min of 1, that comes
# out to 3.
class Priority
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this should be a Data class (and get all of the goodies it provides). I think it makes sense like record did for .NET and @dataclass did for Python. Still will need much of what you have here (so after the Data.define, re-open the class Priority to add stuff).

# @!visibility private
def self._from_proto(priority)
return default if priority.nil?

new(priority_key: priority.priority_key.zero? ? nil : priority.priority_key)
end

# The default priority instance.
#
# @return [Priority] The default priority
def self.default
@default ||= new(priority_key: nil)
end

# @!visibility private
def _to_proto
return nil if priority_key.nil?

Temporalio::Api::Common::V1::Priority.new(priority_key: priority_key || 0)
end

# @return [Boolean] True if this priority is empty/default
def empty?
priority_key.nil?
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def self.default_info
heartbeat_details: [],
heartbeat_timeout: nil,
local?: false,
priority: Temporalio::Priority.default,
schedule_to_close_timeout: 1.0,
scheduled_time: Time.at(0),
start_to_close_timeout: 1.0,
Expand Down
6 changes: 4 additions & 2 deletions temporalio/lib/temporalio/worker/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ def handle_update(input)
:cancellation_type,
:activity_id,
:disable_eager_execution,
:headers
:headers,
:priority
)

# Input for {Outbound.execute_local_activity}.
Expand Down Expand Up @@ -284,7 +285,8 @@ def handle_update(input)
:cron_schedule,
:memo,
:search_attributes,
:headers
:headers,
:priority
)

# Outbound interceptor for intercepting outbound workflow calls. This should be extended by users needing to
Expand Down
2 changes: 1 addition & 1 deletion temporalio/lib/temporalio/worker/workflow_replayer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def initialize(
)._to_bridge_options,
identity_override: options.identity,
max_cached_workflows: 2,
max_concurrent_workflow_task_polls: 1,
max_concurrent_workflow_task_polls: 2,
nonsticky_to_sticky_poll_ratio: 1.0,
max_concurrent_activity_task_polls: 1,
no_remote_activities: true,
Expand Down
19 changes: 13 additions & 6 deletions temporalio/lib/temporalio/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'random/formatter'
require 'temporalio/error'
require 'temporalio/priority'
require 'temporalio/workflow/activity_cancellation_type'
require 'temporalio/workflow/child_workflow_cancellation_type'
require 'temporalio/workflow/child_workflow_handle'
Expand Down Expand Up @@ -130,6 +131,7 @@ def self.deprecate_patch(patch_id)
# optimization on some servers that sends activities back to the same worker as the calling workflow if they can
# run there. If `false` (the default), eager execution may still be disabled at the worker level or may not be
# requested due to lack of available slots.
# @param priority [Priority] Priority of the activity. This is currently experimental.
#
# @return [Object] Result of the activity.
# @raise [Error::ActivityError] Activity failed (and retry was disabled or exhausted).
Expand All @@ -148,12 +150,14 @@ def self.execute_activity(
cancellation: Workflow.cancellation,
cancellation_type: ActivityCancellationType::TRY_CANCEL,
activity_id: nil,
disable_eager_execution: false
disable_eager_execution: false,
priority: Priority.default
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the import to this file and to activity.rb? I know it doesn't make sense to add everywhere, but along with temporalio/client, these two are kinda "single require entry points" to the SDK.

)
_current.execute_activity(
activity, *args,
task_queue:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:,
heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:
heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:,
priority:
)
end

Expand All @@ -175,13 +179,14 @@ def self.execute_child_workflow(
retry_policy: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil
search_attributes: nil,
priority: Priority.default
)
start_child_workflow(
workflow, *args,
id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:,
parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:,
retry_policy:, cron_schedule:, memo:, search_attributes:
retry_policy:, cron_schedule:, memo:, search_attributes:, priority:
).result
end

Expand Down Expand Up @@ -372,6 +377,7 @@ def self.sleep(duration, summary: nil, cancellation: Workflow.cancellation)
# @param cron_schedule [String, nil] Cron schedule. Users should use schedules instead of this.
# @param memo [Hash{String, Symbol => Object}, nil] Memo for the workflow.
# @param search_attributes [SearchAttributes, nil] Search attributes for the workflow.
# @param priority [Priority] Priority of the workflow. This is currently experimental.
#
# @return [ChildWorkflowHandle] Workflow handle to the started workflow.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists for the ID.
Expand All @@ -393,13 +399,14 @@ def self.start_child_workflow(
retry_policy: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil
search_attributes: nil,
priority: Priority.default
)
_current.start_child_workflow(
workflow, *args,
id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:,
parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:,
retry_policy:, cron_schedule:, memo:, search_attributes:
retry_policy:, cron_schedule:, memo:, search_attributes:, priority:
)
end

Expand Down
3 changes: 3 additions & 0 deletions temporalio/lib/temporalio/workflow/info.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Workflow
:last_result,
:namespace,
:parent,
:priority,
:retry_policy,
:root,
:run_id,
Expand Down Expand Up @@ -44,6 +45,8 @@ module Workflow
# @return [String] Namespace for the workflow.
# @!attribute parent
# @return [ParentInfo, nil] Parent information for the workflow if this is a child.
# @!attribute priority
# @return [Priority] The priority of this workflow.
# @!attribute retry_policy
# @return [RetryPolicy, nil] Retry policy for the workflow.
# @!attribute root
Expand Down
4 changes: 3 additions & 1 deletion temporalio/sig/temporalio/activity/info.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Temporalio
attr_reader heartbeat_details: Array[Object?]
attr_reader heartbeat_timeout: Float?
attr_reader local?: bool
attr_reader priority: Temporalio::Priority
attr_reader schedule_to_close_timeout: Float?
attr_reader scheduled_time: Time
attr_reader start_to_close_timeout: Float?
Expand All @@ -27,6 +28,7 @@ module Temporalio
heartbeat_details: Array[Object?],
heartbeat_timeout: Float?,
local?: bool,
priority: Temporalio::Priority?,
schedule_to_close_timeout: Float?,
scheduled_time: Time,
start_to_close_timeout: Float?,
Expand All @@ -42,4 +44,4 @@ module Temporalio
def with: (**untyped) -> Info
end
end
end
end
Loading
Loading