Skip to content

Commit 64552cf

Browse files
authored
Add Poller Group fields (#744)
- **Add poller groups** - **rename resource_id to routing key for nexus** - **fix typo** - **Add poller group fields** <!-- Describe what has changed in this PR --> **What changed?** Adding `poller-group-id` and `poller-group-info` fields to the poll APIs. <!-- Tell your future self why have you made these changes --> **Why?** SDK needs to pass poller group ID in some APIs according to server instructions. <!-- Are there any breaking changes on binary or code level? --> **Breaking changes** None <!-- If this breaks the Server, please provide the Server PR to merge right after this PR was merged. --> **Server PR** No new API added.
1 parent 9109a85 commit 64552cf

5 files changed

Lines changed: 134 additions & 3 deletions

File tree

openapi/openapiv2.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15379,6 +15379,30 @@
1537915379
"pollerScalingDecision": {
1538015380
"$ref": "#/definitions/v1PollerScalingDecision",
1538115381
"description": "Server-advised information the SDK may use to adjust its poller count."
15382+
},
15383+
"pollerGroupId": {
15384+
"type": "string",
15385+
"description": "This poller group ID identifies the owner of the workflow task awaiting for query response.\nCorresponding RespondQueryTaskCompleted should pass this value for proper routing."
15386+
},
15387+
"pollerGroupInfos": {
15388+
"type": "array",
15389+
"items": {
15390+
"type": "object",
15391+
"$ref": "#/definitions/v1PollerGroupInfo"
15392+
},
15393+
"description": "The weighted list of poller groups IDs that client should use for future polls to this task\nqueue. Client is expected to:\n 1. Maintain minimum number of pollers no less than the number of groups.\n 2. Try to assign the next poll to a group without any pending polls,\n 3. If every group has some pending polls, assign the next poll to a group randomly\n according to the weights."
15394+
}
15395+
}
15396+
},
15397+
"v1PollerGroupInfo": {
15398+
"type": "object",
15399+
"properties": {
15400+
"id": {
15401+
"type": "string"
15402+
},
15403+
"weight": {
15404+
"type": "number",
15405+
"format": "float"
1538215406
}
1538315407
}
1538415408
},

openapi/openapiv3.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12220,6 +12220,30 @@ components:
1222012220
allOf:
1222112221
- $ref: '#/components/schemas/PollerScalingDecision'
1222212222
description: Server-advised information the SDK may use to adjust its poller count.
12223+
pollerGroupId:
12224+
type: string
12225+
description: |-
12226+
This poller group ID identifies the owner of the workflow task awaiting for query response.
12227+
Corresponding RespondQueryTaskCompleted should pass this value for proper routing.
12228+
pollerGroupInfos:
12229+
type: array
12230+
items:
12231+
$ref: '#/components/schemas/PollerGroupInfo'
12232+
description: |-
12233+
The weighted list of poller groups IDs that client should use for future polls to this task
12234+
queue. Client is expected to:
12235+
1. Maintain minimum number of pollers no less than the number of groups.
12236+
2. Try to assign the next poll to a group without any pending polls,
12237+
3. If every group has some pending polls, assign the next poll to a group randomly
12238+
according to the weights.
12239+
PollerGroupInfo:
12240+
type: object
12241+
properties:
12242+
id:
12243+
type: string
12244+
weight:
12245+
type: number
12246+
format: float
1222312247
PollerInfo:
1222412248
type: object
1222512249
properties:

temporal/api/taskqueue/v1/message.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,11 @@ message TimestampedCompatibleBuildIdRedirectRule {
303303
google.protobuf.Timestamp create_time = 2;
304304
}
305305

306+
message PollerGroupInfo {
307+
string id = 1;
308+
float weight = 2;
309+
}
310+
306311
// Attached to task responses to give hints to the SDK about how it may adjust its number of
307312
// pollers.
308313
message PollerScalingDecision {

temporal/api/workflowservice/v1/request_response.proto

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,11 @@ message GetWorkflowExecutionHistoryReverseResponse {
260260
message PollWorkflowTaskQueueRequest {
261261
string namespace = 1;
262262
temporal.api.taskqueue.v1.TaskQueue task_queue = 2;
263+
// Unless this is the first poll, the client must pass one of the poller group IDs received in
264+
// `poller_group_infos` of the last the PollWorkflowTaskQueueResponse according to the
265+
// instructions. If not set, the poll is routed randomly which can cause it being blocked
266+
// without receiving a task while the queue actually has tasks in another server location.
267+
string poller_group_id = 10;
263268
// The identity of the worker/client who is polling this task queue
264269
string identity = 3;
265270
// A unique key for this worker instance, used for tracking worker lifecycle.
@@ -282,7 +287,7 @@ message PollWorkflowTaskQueueRequest {
282287
temporal.api.deployment.v1.WorkerDeploymentOptions deployment_options = 6;
283288

284289
// Removed in 1.55.0; was temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat
285-
reserved 7;
290+
reserved 7;
286291
reserved "worker_heartbeat";
287292
}
288293

@@ -338,6 +343,16 @@ message PollWorkflowTaskQueueResponse {
338343
repeated temporal.api.protocol.v1.Message messages = 15;
339344
// Server-advised information the SDK may use to adjust its poller count.
340345
temporal.api.taskqueue.v1.PollerScalingDecision poller_scaling_decision = 16;
346+
// This poller group ID identifies the owner of the workflow task awaiting for query response.
347+
// Corresponding RespondQueryTaskCompleted should pass this value for proper routing.
348+
string poller_group_id = 17;
349+
// The weighted list of poller groups IDs that client should use for future polls to this task
350+
// queue. Client is expected to:
351+
// 1. Maintain minimum number of pollers no less than the number of groups.
352+
// 2. Try to assign the next poll to a group without any pending polls,
353+
// 3. If every group has some pending polls, assign the next poll to a group randomly
354+
// according to the weights.
355+
repeated temporal.api.taskqueue.v1.PollerGroupInfo poller_group_infos = 18;
341356
}
342357

343358
message RespondWorkflowTaskCompletedRequest {
@@ -458,6 +473,11 @@ message RespondWorkflowTaskFailedResponse {
458473
message PollActivityTaskQueueRequest {
459474
string namespace = 1;
460475
temporal.api.taskqueue.v1.TaskQueue task_queue = 2;
476+
// Unless this is the first poll, the client must pass one of the poller group IDs received in
477+
// `poller_group_infos` of the last the PollActivityTaskQueueResponse according to the
478+
// instructions. If not set, the poll is routed randomly which can cause it being blocked
479+
// without receiving a task while the queue actually has tasks in another server location.
480+
string poller_group_id = 10;
461481
// The identity of the worker/client
462482
string identity = 3;
463483
// A unique key for this worker instance, used for tracking worker lifecycle.
@@ -536,6 +556,13 @@ message PollActivityTaskQueueResponse {
536556
temporal.api.common.v1.Priority priority = 19;
537557
// The run ID of the activity execution, only set for standalone activities.
538558
string activity_run_id = 20;
559+
// The weighted list of poller groups IDs that client should use for future polls to this task
560+
// queue. Client is expected to:
561+
// 1. Maintain minimum number of pollers no less than the number of groups.
562+
// 2. Try to assign the next poll to a group without any pending polls,
563+
// 3. If every group has some pending polls, assign the next poll to a group randomly
564+
// according to the weights.
565+
repeated temporal.api.taskqueue.v1.PollerGroupInfo poller_group_infos = 21;
539566
}
540567

541568
message RecordActivityTaskHeartbeatRequest {
@@ -1058,6 +1085,9 @@ message RespondQueryTaskCompletedRequest {
10581085
// Why did the task fail? It's important to note that many of the variants in this enum cannot
10591086
// apply to worker responses. See the type's doc for more.
10601087
temporal.api.enums.v1.WorkflowTaskFailedCause cause = 8;
1088+
// Client must forward the poller_group_id received in PollWorkflowTaskQueueResponse for proper
1089+
// routing of the response.
1090+
string poller_group_id = 9;
10611091
}
10621092

10631093
message RespondQueryTaskCompletedResponse {
@@ -1907,12 +1937,17 @@ message PollWorkflowExecutionUpdateResponse {
19071937

19081938
message PollNexusTaskQueueRequest {
19091939
string namespace = 1;
1940+
temporal.api.taskqueue.v1.TaskQueue task_queue = 3;
1941+
// Unless this is the first poll, the client must pass one of the poller group IDs received in
1942+
// `poller_group_infos` of the last the PollNexusTaskQueueResponse according to the
1943+
// instructions. If not set, the poll is routed randomly which can cause it being blocked
1944+
// without receiving a task while the queue actually has tasks in another server location.
1945+
string poller_group_id = 9;
19101946
// The identity of the client who initiated this request.
19111947
string identity = 2;
19121948
// A unique key for this worker instance, used for tracking worker lifecycle.
19131949
// This is guaranteed to be unique, whereas identity is not guaranteed to be unique.
19141950
string worker_instance_key = 8;
1915-
temporal.api.taskqueue.v1.TaskQueue task_queue = 3;
19161951
// Information about this worker's build identifier and if it is choosing to use the versioning
19171952
// feature. See the `WorkerVersionCapabilities` docstring for more.
19181953
// Deprecated. Replaced by deployment_options.
@@ -1931,6 +1966,18 @@ message PollNexusTaskQueueResponse {
19311966
temporal.api.nexus.v1.Request request = 2;
19321967
// Server-advised information the SDK may use to adjust its poller count.
19331968
temporal.api.taskqueue.v1.PollerScalingDecision poller_scaling_decision = 3;
1969+
// This poller group ID identifies the owner of the nexus task awaiting for synchronous
1970+
// response.
1971+
// Corresponding `RespondNexusTaskCompleted` and `RespondNexusTaskFailed` calls should pass this
1972+
// value for proper response routing.
1973+
string poller_group_id = 4;
1974+
// The weighted list of poller groups IDs that client should use for future polls to this task
1975+
// queue. Client is expected to:
1976+
// 1. Maintain minimum number of pollers no less than the number of groups.
1977+
// 2. Try to assign the next poll to a group without any pending polls,
1978+
// 3. If every group has some pending polls, assign the next poll to a group randomly
1979+
// according to the weights.
1980+
repeated temporal.api.taskqueue.v1.PollerGroupInfo poller_group_infos = 5;
19341981
}
19351982

19361983
message RespondNexusTaskCompletedRequest {
@@ -1941,6 +1988,9 @@ message RespondNexusTaskCompletedRequest {
19411988
bytes task_token = 3;
19421989
// Embedded response to be translated into a frontend response.
19431990
temporal.api.nexus.v1.Response response = 4;
1991+
// Client must forward the poller_group_id received in PollNexusTaskQueueResponse for proper
1992+
// routing of the response.
1993+
string poller_group_id = 5;
19441994
}
19451995

19461996
message RespondNexusTaskCompletedResponse {
@@ -1956,6 +2006,9 @@ message RespondNexusTaskFailedRequest {
19562006
temporal.api.nexus.v1.HandlerError error = 4 [deprecated = true];
19572007
// The error the handler failed with. Must contain a NexusHandlerFailureInfo object.
19582008
temporal.api.failure.v1.Failure failure = 5;
2009+
// Client must forward the poller_group_id received in PollNexusTaskQueueResponse for proper
2010+
// routing of the response.
2011+
string poller_group_id = 6;
19592012
}
19602013

19612014
message RespondNexusTaskFailedResponse {

temporal/api/workflowservice/v1/service.proto

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ service WorkflowService {
166166
// (-- api-linter: core::0127::http-annotation=disabled
167167
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
168168
rpc PollWorkflowTaskQueue (PollWorkflowTaskQueueRequest) returns (PollWorkflowTaskQueueResponse) {
169+
option (temporal.api.protometa.v1.request_header) = {
170+
header: "temporal-resource-id"
171+
value: "poller:{poller_group_id}"
172+
};
169173
}
170174

171175
// RespondWorkflowTaskCompleted is called by workers to successfully complete workflow tasks
@@ -219,6 +223,10 @@ service WorkflowService {
219223
// (-- api-linter: core::0127::http-annotation=disabled
220224
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
221225
rpc PollActivityTaskQueue (PollActivityTaskQueueRequest) returns (PollActivityTaskQueueResponse) {
226+
option (temporal.api.protometa.v1.request_header) = {
227+
header: "temporal-resource-id"
228+
value: "poller:{poller_group_id}"
229+
};
222230
}
223231

224232
// RecordActivityTaskHeartbeat is optionally called by workers while they execute activities.
@@ -618,7 +626,12 @@ service WorkflowService {
618626
//
619627
// (-- api-linter: core::0127::http-annotation=disabled
620628
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
621-
rpc RespondQueryTaskCompleted (RespondQueryTaskCompletedRequest) returns (RespondQueryTaskCompletedResponse) {}
629+
rpc RespondQueryTaskCompleted (RespondQueryTaskCompletedRequest) returns (RespondQueryTaskCompletedResponse) {
630+
option (temporal.api.protometa.v1.request_header) = {
631+
header: "temporal-resource-id"
632+
value: "poller:{poller_group_id}"
633+
};
634+
}
622635

623636
// ResetStickyTaskQueue resets the sticky task queue related information in the mutable state of
624637
// a given workflow. This is prudent for workers to perform if a workflow has been paged out of
@@ -1296,18 +1309,30 @@ service WorkflowService {
12961309
// (-- api-linter: core::0127::http-annotation=disabled
12971310
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
12981311
rpc PollNexusTaskQueue(PollNexusTaskQueueRequest) returns (PollNexusTaskQueueResponse) {
1312+
option (temporal.api.protometa.v1.request_header) = {
1313+
header: "temporal-resource-id"
1314+
value: "poller:{poller_group_id}"
1315+
};
12991316
}
13001317

13011318
// RespondNexusTaskCompleted is called by workers to respond to Nexus tasks received via PollNexusTaskQueue.
13021319
// (-- api-linter: core::0127::http-annotation=disabled
13031320
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
13041321
rpc RespondNexusTaskCompleted(RespondNexusTaskCompletedRequest) returns (RespondNexusTaskCompletedResponse) {
1322+
option (temporal.api.protometa.v1.request_header) = {
1323+
header: "temporal-resource-id"
1324+
value: "poller:{poller_group_id}"
1325+
};
13051326
}
13061327

13071328
// RespondNexusTaskFailed is called by workers to fail Nexus tasks received via PollNexusTaskQueue.
13081329
// (-- api-linter: core::0127::http-annotation=disabled
13091330
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
13101331
rpc RespondNexusTaskFailed(RespondNexusTaskFailedRequest) returns (RespondNexusTaskFailedResponse) {
1332+
option (temporal.api.protometa.v1.request_header) = {
1333+
header: "temporal-resource-id"
1334+
value: "poller:{poller_group_id}"
1335+
};
13111336
}
13121337

13131338
// UpdateActivityOptions is called by the client to update the options of an activity by its ID or type.

0 commit comments

Comments
 (0)