Skip to content

Commit 97093c5

Browse files
authored
Priority field passthrough (#892)
1 parent 56a1e14 commit 97093c5

31 files changed

Lines changed: 5944 additions & 2075 deletions

File tree

.cargo/config.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
[env]
2+
# This temporarily overrides the version of the CLI used for integration tests, locally and in CI
3+
# TODO: Comment out once priority is released
4+
CLI_VERSION_OVERRIDE = "v1.3.1-priority.0"
5+
16
[alias]
27
integ-test = ["run", "--package", "temporal-sdk-core", "--example", "integ_runner", "--"]
38
lint = ["clippy", "--workspace", "--examples", "--all-features",

client/src/lib.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,51 @@ pub struct WorkflowOptions {
10881088
/// Callbacks that will be invoked upon workflow completion. For, ex, completing nexus
10891089
/// operations.
10901090
pub completion_callbacks: Vec<common::v1::Callback>,
1091+
1092+
/// Priority for the workflow
1093+
pub priority: Option<Priority>,
1094+
}
1095+
1096+
/// Priority contains metadata that controls relative ordering of task processing
1097+
/// when tasks are backlogged in a queue. Initially, Priority will be used in
1098+
/// activity and workflow task queues, which are typically where backlogs exist.
1099+
/// Other queues in the server (such as transfer and timer queues) and rate
1100+
/// limiting decisions do not use Priority, but may in the future.
1101+
///
1102+
/// Priority is attached to workflows and activities. Activities and child
1103+
/// workflows inherit Priority from the workflow that created them, but may
1104+
/// override fields when they are started or modified. For each field of a
1105+
/// Priority on an activity/workflow, not present or equal to zero/empty string
1106+
/// means to inherit the value from the calling workflow, or if there is no
1107+
/// calling workflow, then use the default (documented below).
1108+
///
1109+
/// Despite being named "Priority", this message will also contains fields that
1110+
/// control "fairness" mechanisms.
1111+
///
1112+
/// The overall semantics of Priority are:
1113+
/// (more will be added here later)
1114+
/// 1. First, consider "priority_key": lower number goes first.
1115+
#[derive(Debug, Clone, Default)]
1116+
pub struct Priority {
1117+
/// Priority key is a positive integer from 1 to n, where smaller integers
1118+
/// correspond to higher priorities (tasks run sooner). In general, tasks in
1119+
/// a queue should be processed in close to priority order, although small
1120+
/// deviations are possible.
1121+
///
1122+
/// The maximum priority value (minimum priority) is determined by server
1123+
/// configuration, and defaults to 5.
1124+
///
1125+
/// The default priority is (min+max)/2. With the default max of 5 and min of
1126+
/// 1, that comes out to 3.
1127+
pub priority_key: u32,
1128+
}
1129+
1130+
impl From<Priority> for common::v1::Priority {
1131+
fn from(priority: Priority) -> Self {
1132+
common::v1::Priority {
1133+
priority_key: priority.priority_key as i32,
1134+
}
1135+
}
10911136
}
10921137

10931138
#[async_trait::async_trait]
@@ -1140,6 +1185,7 @@ where
11401185
retry_policy: options.retry_policy,
11411186
links: options.links,
11421187
completion_callbacks: options.completion_callbacks,
1188+
priority: options.priority.map(Into::into),
11431189
..Default::default()
11441190
})
11451191
.await?

client/src/raw.rs

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,27 +1178,27 @@ proxier! {
11781178
}
11791179
);
11801180
(
1181-
update_activity_options_by_id,
1182-
UpdateActivityOptionsByIdRequest,
1183-
UpdateActivityOptionsByIdResponse,
1181+
update_activity_options,
1182+
UpdateActivityOptionsRequest,
1183+
UpdateActivityOptionsResponse,
11841184
|r| {
11851185
let labels = namespaced_request!(r);
11861186
r.extensions_mut().insert(labels);
11871187
}
11881188
);
11891189
(
1190-
pause_activity_by_id,
1191-
PauseActivityByIdRequest,
1192-
PauseActivityByIdResponse,
1190+
pause_activity,
1191+
PauseActivityRequest,
1192+
PauseActivityResponse,
11931193
|r| {
11941194
let labels = namespaced_request!(r);
11951195
r.extensions_mut().insert(labels);
11961196
}
11971197
);
11981198
(
1199-
unpause_activity_by_id,
1200-
UnpauseActivityByIdRequest,
1201-
UnpauseActivityByIdResponse,
1199+
unpause_activity,
1200+
UnpauseActivityRequest,
1201+
UnpauseActivityResponse,
12021202
|r| {
12031203
let labels = namespaced_request!(r);
12041204
r.extensions_mut().insert(labels);
@@ -1214,9 +1214,81 @@ proxier! {
12141214
}
12151215
);
12161216
(
1217-
reset_activity_by_id,
1218-
ResetActivityByIdRequest,
1219-
ResetActivityByIdResponse,
1217+
reset_activity,
1218+
ResetActivityRequest,
1219+
ResetActivityResponse,
1220+
|r| {
1221+
let labels = namespaced_request!(r);
1222+
r.extensions_mut().insert(labels);
1223+
}
1224+
);
1225+
(
1226+
delete_worker_deployment,
1227+
DeleteWorkerDeploymentRequest,
1228+
DeleteWorkerDeploymentResponse,
1229+
|r| {
1230+
let labels = namespaced_request!(r);
1231+
r.extensions_mut().insert(labels);
1232+
}
1233+
);
1234+
(
1235+
delete_worker_deployment_version,
1236+
DeleteWorkerDeploymentVersionRequest,
1237+
DeleteWorkerDeploymentVersionResponse,
1238+
|r| {
1239+
let labels = namespaced_request!(r);
1240+
r.extensions_mut().insert(labels);
1241+
}
1242+
);
1243+
(
1244+
describe_worker_deployment,
1245+
DescribeWorkerDeploymentRequest,
1246+
DescribeWorkerDeploymentResponse,
1247+
|r| {
1248+
let labels = namespaced_request!(r);
1249+
r.extensions_mut().insert(labels);
1250+
}
1251+
);
1252+
(
1253+
describe_worker_deployment_version,
1254+
DescribeWorkerDeploymentVersionRequest,
1255+
DescribeWorkerDeploymentVersionResponse,
1256+
|r| {
1257+
let labels = namespaced_request!(r);
1258+
r.extensions_mut().insert(labels);
1259+
}
1260+
);
1261+
(
1262+
list_worker_deployments,
1263+
ListWorkerDeploymentsRequest,
1264+
ListWorkerDeploymentsResponse,
1265+
|r| {
1266+
let labels = namespaced_request!(r);
1267+
r.extensions_mut().insert(labels);
1268+
}
1269+
);
1270+
(
1271+
set_worker_deployment_current_version,
1272+
SetWorkerDeploymentCurrentVersionRequest,
1273+
SetWorkerDeploymentCurrentVersionResponse,
1274+
|r| {
1275+
let labels = namespaced_request!(r);
1276+
r.extensions_mut().insert(labels);
1277+
}
1278+
);
1279+
(
1280+
set_worker_deployment_ramping_version,
1281+
SetWorkerDeploymentRampingVersionRequest,
1282+
SetWorkerDeploymentRampingVersionResponse,
1283+
|r| {
1284+
let labels = namespaced_request!(r);
1285+
r.extensions_mut().insert(labels);
1286+
}
1287+
);
1288+
(
1289+
update_worker_deployment_version_metadata,
1290+
UpdateWorkerDeploymentVersionMetadataRequest,
1291+
UpdateWorkerDeploymentVersionMetadataResponse,
12201292
|r| {
12211293
let labels = namespaced_request!(r);
12221294
r.extensions_mut().insert(labels);

core/src/worker/client.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,14 @@ impl WorkerClient for WorkerClientBag {
180180
&self,
181181
task_queue: TaskQueue,
182182
) -> Result<PollWorkflowTaskQueueResponse> {
183+
#[allow(deprecated)] // want to list all fields explicitly
183184
let mut request = PollWorkflowTaskQueueRequest {
184185
namespace: self.namespace.clone(),
185186
task_queue: Some(task_queue),
186187
identity: self.identity.clone(),
187188
binary_checksum: self.binary_checksum(),
188189
worker_version_capabilities: self.worker_version_capabilities(),
190+
deployment_options: None,
189191
}
190192
.into_request();
191193
request.extensions_mut().insert(IsWorkerTaskLongPoll);
@@ -202,6 +204,7 @@ impl WorkerClient for WorkerClientBag {
202204
task_queue: String,
203205
max_tasks_per_sec: Option<f64>,
204206
) -> Result<PollActivityTaskQueueResponse> {
207+
#[allow(deprecated)] // want to list all fields explicitly
205208
let mut request = PollActivityTaskQueueRequest {
206209
namespace: self.namespace.clone(),
207210
task_queue: Some(TaskQueue {
@@ -214,6 +217,7 @@ impl WorkerClient for WorkerClientBag {
214217
max_tasks_per_second: Some(tps),
215218
}),
216219
worker_version_capabilities: self.worker_version_capabilities(),
220+
deployment_options: None,
217221
}
218222
.into_request();
219223
request.extensions_mut().insert(IsWorkerTaskLongPoll);
@@ -226,6 +230,7 @@ impl WorkerClient for WorkerClientBag {
226230
}
227231

228232
async fn poll_nexus_task(&self, task_queue: String) -> Result<PollNexusTaskQueueResponse> {
233+
#[allow(deprecated)] // want to list all fields explicitly
229234
let mut request = PollNexusTaskQueueRequest {
230235
namespace: self.namespace.clone(),
231236
task_queue: Some(TaskQueue {
@@ -235,6 +240,7 @@ impl WorkerClient for WorkerClientBag {
235240
}),
236241
identity: self.identity.clone(),
237242
worker_version_capabilities: self.worker_version_capabilities(),
243+
deployment_options: None,
238244
}
239245
.into_request();
240246
request.extensions_mut().insert(IsWorkerTaskLongPoll);
@@ -287,6 +293,7 @@ impl WorkerClient for WorkerClientBag {
287293
// TODO: https://github.com/temporalio/sdk-core/issues/866
288294
deployment: None,
289295
versioning_behavior: 0,
296+
deployment_options: None,
290297
};
291298
Ok(self
292299
.cloned_client()
@@ -312,6 +319,7 @@ impl WorkerClient for WorkerClientBag {
312319
worker_version: self.worker_version_stamp(),
313320
// TODO: https://github.com/temporalio/sdk-core/issues/866
314321
deployment: None,
322+
deployment_options: None,
315323
},
316324
)
317325
.await?
@@ -369,6 +377,7 @@ impl WorkerClient for WorkerClientBag {
369377
worker_version: self.worker_version_stamp(),
370378
// TODO: https://github.com/temporalio/sdk-core/issues/866
371379
deployment: None,
380+
deployment_options: None,
372381
},
373382
)
374383
.await?
@@ -394,6 +403,7 @@ impl WorkerClient for WorkerClientBag {
394403
worker_version: self.worker_version_stamp(),
395404
// TODO: https://github.com/temporalio/sdk-core/issues/866
396405
deployment: None,
406+
deployment_options: None,
397407
},
398408
)
399409
.await?
@@ -418,6 +428,7 @@ impl WorkerClient for WorkerClientBag {
418428
worker_version: self.worker_version_stamp(),
419429
// TODO: https://github.com/temporalio/sdk-core/issues/866
420430
deployment: None,
431+
deployment_options: None,
421432
};
422433
Ok(self
423434
.cloned_client()

core/src/worker/workflow/machines/nexus_operation_state_machine.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,7 @@ impl NexusOperationMachine {
534534
service: self.shared_state.service.clone(),
535535
operation: self.shared_state.operation.clone(),
536536
operation_id: "".to_string(),
537+
operation_token: "".to_string(),
537538
},
538539
)),
539540
..Default::default()

sdk-core-protos/protos/api_upstream/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
SHELL=/bin/bash -o pipefail
1+
SHELL=bash -o pipefail
22

33
$(VERBOSE).SILENT:
44
############################# Main targets #############################
@@ -76,7 +76,7 @@ http-api-docs:
7676
##### Plugins & tools #####
7777
grpc-install:
7878
@printf $(COLOR) "Install/update protoc and plugins..."
79-
@go install go.temporal.io/api/cmd/protogen@latest
79+
@go install go.temporal.io/api/cmd/protogen@master
8080
@go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
8181
@go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
8282
@go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest

0 commit comments

Comments
 (0)