diff --git a/api/taskqueue/v1/message.go-helpers.pb.go b/api/taskqueue/v1/message.go-helpers.pb.go index 4dd71fdb036..191ec828f23 100644 --- a/api/taskqueue/v1/message.go-helpers.pb.go +++ b/api/taskqueue/v1/message.go-helpers.pb.go @@ -227,6 +227,43 @@ func (this *TaskQueuePartition) Equal(that interface{}) bool { return proto.Equal(this, that1) } +// Marshal an object of type WorkerCommandsPartitionId to the protobuf v3 wire format +func (val *WorkerCommandsPartitionId) Marshal() ([]byte, error) { + return proto.Marshal(val) +} + +// Unmarshal an object of type WorkerCommandsPartitionId from the protobuf v3 wire format +func (val *WorkerCommandsPartitionId) Unmarshal(buf []byte) error { + return proto.Unmarshal(buf, val) +} + +// Size returns the size of the object, in bytes, once serialized +func (val *WorkerCommandsPartitionId) Size() int { + return proto.Size(val) +} + +// Equal returns whether two WorkerCommandsPartitionId values are equivalent by recursively +// comparing the message's fields. +// For more information see the documentation for +// https://pkg.go.dev/google.golang.org/protobuf/proto#Equal +func (this *WorkerCommandsPartitionId) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + var that1 *WorkerCommandsPartitionId + switch t := that.(type) { + case *WorkerCommandsPartitionId: + that1 = t + case WorkerCommandsPartitionId: + that1 = &t + default: + return false + } + + return proto.Equal(this, that1) +} + // Marshal an object of type BuildIdRedirectInfo to the protobuf v3 wire format func (val *BuildIdRedirectInfo) Marshal() ([]byte, error) { return proto.Marshal(val) diff --git a/api/taskqueue/v1/message.pb.go b/api/taskqueue/v1/message.pb.go index e043253cddc..8a44ca56f15 100644 --- a/api/taskqueue/v1/message.pb.go +++ b/api/taskqueue/v1/message.pb.go @@ -477,7 +477,8 @@ func (x *PhysicalTaskQueueInfo) GetTaskQueueStatsByPriorityKey() map[int32]*v13. return nil } -// Represents a normal or sticky partition of a task queue. +// Internal representation of a task queue partition, used for server-to-server RPCs. +// This is the internal equivalent of temporal.api.taskqueue.v1.TaskQueue. type TaskQueuePartition struct { state protoimpl.MessageState `protogen:"open.v1"` // This is the user-facing name for this task queue @@ -489,6 +490,7 @@ type TaskQueuePartition struct { // // *TaskQueuePartition_NormalPartitionId // *TaskQueuePartition_StickyName + // *TaskQueuePartition_WorkerCommands PartitionId isTaskQueuePartition_PartitionId `protobuf_oneof:"partition_id"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -563,6 +565,15 @@ func (x *TaskQueuePartition) GetStickyName() string { return "" } +func (x *TaskQueuePartition) GetWorkerCommands() *WorkerCommandsPartitionId { + if x != nil { + if x, ok := x.PartitionId.(*TaskQueuePartition_WorkerCommands); ok { + return x.WorkerCommands + } + } + return nil +} + type isTaskQueuePartition_PartitionId interface { isTaskQueuePartition_PartitionId() } @@ -575,10 +586,52 @@ type TaskQueuePartition_StickyName struct { StickyName string `protobuf:"bytes,4,opt,name=sticky_name,json=stickyName,proto3,oneof"` } +type TaskQueuePartition_WorkerCommands struct { + WorkerCommands *WorkerCommandsPartitionId `protobuf:"bytes,5,opt,name=worker_commands,json=workerCommands,proto3,oneof"` +} + func (*TaskQueuePartition_NormalPartitionId) isTaskQueuePartition_PartitionId() {} func (*TaskQueuePartition_StickyName) isTaskQueuePartition_PartitionId() {} +func (*TaskQueuePartition_WorkerCommands) isTaskQueuePartition_PartitionId() {} + +type WorkerCommandsPartitionId struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WorkerCommandsPartitionId) Reset() { + *x = WorkerCommandsPartitionId{} + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WorkerCommandsPartitionId) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WorkerCommandsPartitionId) ProtoMessage() {} + +func (x *WorkerCommandsPartitionId) ProtoReflect() protoreflect.Message { + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WorkerCommandsPartitionId.ProtoReflect.Descriptor instead. +func (*WorkerCommandsPartitionId) Descriptor() ([]byte, []int) { + return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{6} +} + // Information about redirect intention sent by Matching to History in Record*TaskStarted calls. // Deprecated. type BuildIdRedirectInfo struct { @@ -593,7 +646,7 @@ type BuildIdRedirectInfo struct { func (x *BuildIdRedirectInfo) Reset() { *x = BuildIdRedirectInfo{} - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[6] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -605,7 +658,7 @@ func (x *BuildIdRedirectInfo) String() string { func (*BuildIdRedirectInfo) ProtoMessage() {} func (x *BuildIdRedirectInfo) ProtoReflect() protoreflect.Message { - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[6] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -618,7 +671,7 @@ func (x *BuildIdRedirectInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use BuildIdRedirectInfo.ProtoReflect.Descriptor instead. func (*BuildIdRedirectInfo) Descriptor() ([]byte, []int) { - return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{6} + return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{7} } func (x *BuildIdRedirectInfo) GetAssignedBuildId() string { @@ -660,7 +713,7 @@ type TaskForwardInfo struct { func (x *TaskForwardInfo) Reset() { *x = TaskForwardInfo{} - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[7] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -672,7 +725,7 @@ func (x *TaskForwardInfo) String() string { func (*TaskForwardInfo) ProtoMessage() {} func (x *TaskForwardInfo) ProtoReflect() protoreflect.Message { - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[7] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -685,7 +738,7 @@ func (x *TaskForwardInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskForwardInfo.ProtoReflect.Descriptor instead. func (*TaskForwardInfo) Descriptor() ([]byte, []int) { - return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{7} + return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{8} } func (x *TaskForwardInfo) GetSourcePartition() string { @@ -750,7 +803,7 @@ type EphemeralData struct { func (x *EphemeralData) Reset() { *x = EphemeralData{} - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[8] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -762,7 +815,7 @@ func (x *EphemeralData) String() string { func (*EphemeralData) ProtoMessage() {} func (x *EphemeralData) ProtoReflect() protoreflect.Message { - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[8] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -775,7 +828,7 @@ func (x *EphemeralData) ProtoReflect() protoreflect.Message { // Deprecated: Use EphemeralData.ProtoReflect.Descriptor instead. func (*EphemeralData) Descriptor() ([]byte, []int) { - return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{8} + return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{9} } func (x *EphemeralData) GetPartition() []*EphemeralData_ByPartition { @@ -795,7 +848,7 @@ type VersionedEphemeralData struct { func (x *VersionedEphemeralData) Reset() { *x = VersionedEphemeralData{} - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[9] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -807,7 +860,7 @@ func (x *VersionedEphemeralData) String() string { func (*VersionedEphemeralData) ProtoMessage() {} func (x *VersionedEphemeralData) ProtoReflect() protoreflect.Message { - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[9] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -820,7 +873,7 @@ func (x *VersionedEphemeralData) ProtoReflect() protoreflect.Message { // Deprecated: Use VersionedEphemeralData.ProtoReflect.Descriptor instead. func (*VersionedEphemeralData) Descriptor() ([]byte, []int) { - return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{9} + return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{10} } func (x *VersionedEphemeralData) GetData() *EphemeralData { @@ -848,7 +901,7 @@ type ClientPartitionCounts struct { func (x *ClientPartitionCounts) Reset() { *x = ClientPartitionCounts{} - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[10] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -860,7 +913,7 @@ func (x *ClientPartitionCounts) String() string { func (*ClientPartitionCounts) ProtoMessage() {} func (x *ClientPartitionCounts) ProtoReflect() protoreflect.Message { - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[10] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -873,7 +926,7 @@ func (x *ClientPartitionCounts) ProtoReflect() protoreflect.Message { // Deprecated: Use ClientPartitionCounts.ProtoReflect.Descriptor instead. func (*ClientPartitionCounts) Descriptor() ([]byte, []int) { - return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{10} + return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{11} } func (x *ClientPartitionCounts) GetRead() int32 { @@ -904,7 +957,7 @@ type EphemeralData_ByVersion struct { func (x *EphemeralData_ByVersion) Reset() { *x = EphemeralData_ByVersion{} - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[12] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -916,7 +969,7 @@ func (x *EphemeralData_ByVersion) String() string { func (*EphemeralData_ByVersion) ProtoMessage() {} func (x *EphemeralData_ByVersion) ProtoReflect() protoreflect.Message { - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[12] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -929,7 +982,7 @@ func (x *EphemeralData_ByVersion) ProtoReflect() protoreflect.Message { // Deprecated: Use EphemeralData_ByVersion.ProtoReflect.Descriptor instead. func (*EphemeralData_ByVersion) Descriptor() ([]byte, []int) { - return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{8, 0} + return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{9, 0} } func (x *EphemeralData_ByVersion) GetVersion() *v12.WorkerDeploymentVersion { @@ -956,7 +1009,7 @@ type EphemeralData_ByPartition struct { func (x *EphemeralData_ByPartition) Reset() { *x = EphemeralData_ByPartition{} - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[13] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -968,7 +1021,7 @@ func (x *EphemeralData_ByPartition) String() string { func (*EphemeralData_ByPartition) ProtoMessage() {} func (x *EphemeralData_ByPartition) ProtoReflect() protoreflect.Message { - mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[13] + mi := &file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -981,7 +1034,7 @@ func (x *EphemeralData_ByPartition) ProtoReflect() protoreflect.Message { // Deprecated: Use EphemeralData_ByPartition.ProtoReflect.Descriptor instead. func (*EphemeralData_ByPartition) Descriptor() ([]byte, []int) { - return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{8, 1} + return file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP(), []int{9, 1} } func (x *EphemeralData_ByPartition) GetPartition() int32 { @@ -1041,15 +1094,17 @@ const file_temporal_server_api_taskqueue_v1_message_proto_rawDesc = "" + " task_queue_stats_by_priority_key\x18\x04 \x03(\v2X.temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.TaskQueueStatsByPriorityKeyEntryR\x1btaskQueueStatsByPriorityKey\x1ay\n" + " TaskQueueStatsByPriorityKeyEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\x05R\x03key\x12?\n" + - "\x05value\x18\x02 \x01(\v2).temporal.api.taskqueue.v1.TaskQueueStatsR\x05value:\x028\x01\"\xe6\x01\n" + + "\x05value\x18\x02 \x01(\v2).temporal.api.taskqueue.v1.TaskQueueStatsR\x05value:\x028\x01\"\xce\x02\n" + "\x12TaskQueuePartition\x12\x1d\n" + "\n" + "task_queue\x18\x01 \x01(\tR\ttaskQueue\x12L\n" + "\x0ftask_queue_type\x18\x02 \x01(\x0e2$.temporal.api.enums.v1.TaskQueueTypeR\rtaskQueueType\x120\n" + "\x13normal_partition_id\x18\x03 \x01(\x05H\x00R\x11normalPartitionId\x12!\n" + "\vsticky_name\x18\x04 \x01(\tH\x00R\n" + - "stickyNameB\x0e\n" + - "\fpartition_id\"A\n" + + "stickyName\x12f\n" + + "\x0fworker_commands\x18\x05 \x01(\v2;.temporal.server.api.taskqueue.v1.WorkerCommandsPartitionIdH\x00R\x0eworkerCommandsB\x0e\n" + + "\fpartition_id\"\x1b\n" + + "\x19WorkerCommandsPartitionId\"A\n" + "\x13BuildIdRedirectInfo\x12*\n" + "\x11assigned_build_id\x18\x01 \x01(\tR\x0fassignedBuildId\"\xa9\x03\n" + "\x0fTaskForwardInfo\x12)\n" + @@ -1089,7 +1144,7 @@ func file_temporal_server_api_taskqueue_v1_message_proto_rawDescGZIP() []byte { return file_temporal_server_api_taskqueue_v1_message_proto_rawDescData } -var file_temporal_server_api_taskqueue_v1_message_proto_msgTypes = make([]protoimpl.MessageInfo, 14) +var file_temporal_server_api_taskqueue_v1_message_proto_msgTypes = make([]protoimpl.MessageInfo, 15) var file_temporal_server_api_taskqueue_v1_message_proto_goTypes = []any{ (*TaskVersionDirective)(nil), // 0: temporal.server.api.taskqueue.v1.TaskVersionDirective (*FairLevel)(nil), // 1: temporal.server.api.taskqueue.v1.FairLevel @@ -1097,53 +1152,55 @@ var file_temporal_server_api_taskqueue_v1_message_proto_goTypes = []any{ (*TaskQueueVersionInfoInternal)(nil), // 3: temporal.server.api.taskqueue.v1.TaskQueueVersionInfoInternal (*PhysicalTaskQueueInfo)(nil), // 4: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo (*TaskQueuePartition)(nil), // 5: temporal.server.api.taskqueue.v1.TaskQueuePartition - (*BuildIdRedirectInfo)(nil), // 6: temporal.server.api.taskqueue.v1.BuildIdRedirectInfo - (*TaskForwardInfo)(nil), // 7: temporal.server.api.taskqueue.v1.TaskForwardInfo - (*EphemeralData)(nil), // 8: temporal.server.api.taskqueue.v1.EphemeralData - (*VersionedEphemeralData)(nil), // 9: temporal.server.api.taskqueue.v1.VersionedEphemeralData - (*ClientPartitionCounts)(nil), // 10: temporal.server.api.taskqueue.v1.ClientPartitionCounts - nil, // 11: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.TaskQueueStatsByPriorityKeyEntry - (*EphemeralData_ByVersion)(nil), // 12: temporal.server.api.taskqueue.v1.EphemeralData.ByVersion - (*EphemeralData_ByPartition)(nil), // 13: temporal.server.api.taskqueue.v1.EphemeralData.ByPartition - (*emptypb.Empty)(nil), // 14: google.protobuf.Empty - (v1.VersioningBehavior)(0), // 15: temporal.api.enums.v1.VersioningBehavior - (*v11.Deployment)(nil), // 16: temporal.api.deployment.v1.Deployment - (*v12.WorkerDeploymentVersion)(nil), // 17: temporal.server.api.deployment.v1.WorkerDeploymentVersion - (*v13.TaskIdBlock)(nil), // 18: temporal.api.taskqueue.v1.TaskIdBlock - (*v13.PollerInfo)(nil), // 19: temporal.api.taskqueue.v1.PollerInfo - (*v13.TaskQueueStats)(nil), // 20: temporal.api.taskqueue.v1.TaskQueueStats - (v1.TaskQueueType)(0), // 21: temporal.api.enums.v1.TaskQueueType - (v14.TaskSource)(0), // 22: temporal.server.api.enums.v1.TaskSource - (*timestamppb.Timestamp)(nil), // 23: google.protobuf.Timestamp + (*WorkerCommandsPartitionId)(nil), // 6: temporal.server.api.taskqueue.v1.WorkerCommandsPartitionId + (*BuildIdRedirectInfo)(nil), // 7: temporal.server.api.taskqueue.v1.BuildIdRedirectInfo + (*TaskForwardInfo)(nil), // 8: temporal.server.api.taskqueue.v1.TaskForwardInfo + (*EphemeralData)(nil), // 9: temporal.server.api.taskqueue.v1.EphemeralData + (*VersionedEphemeralData)(nil), // 10: temporal.server.api.taskqueue.v1.VersionedEphemeralData + (*ClientPartitionCounts)(nil), // 11: temporal.server.api.taskqueue.v1.ClientPartitionCounts + nil, // 12: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.TaskQueueStatsByPriorityKeyEntry + (*EphemeralData_ByVersion)(nil), // 13: temporal.server.api.taskqueue.v1.EphemeralData.ByVersion + (*EphemeralData_ByPartition)(nil), // 14: temporal.server.api.taskqueue.v1.EphemeralData.ByPartition + (*emptypb.Empty)(nil), // 15: google.protobuf.Empty + (v1.VersioningBehavior)(0), // 16: temporal.api.enums.v1.VersioningBehavior + (*v11.Deployment)(nil), // 17: temporal.api.deployment.v1.Deployment + (*v12.WorkerDeploymentVersion)(nil), // 18: temporal.server.api.deployment.v1.WorkerDeploymentVersion + (*v13.TaskIdBlock)(nil), // 19: temporal.api.taskqueue.v1.TaskIdBlock + (*v13.PollerInfo)(nil), // 20: temporal.api.taskqueue.v1.PollerInfo + (*v13.TaskQueueStats)(nil), // 21: temporal.api.taskqueue.v1.TaskQueueStats + (v1.TaskQueueType)(0), // 22: temporal.api.enums.v1.TaskQueueType + (v14.TaskSource)(0), // 23: temporal.server.api.enums.v1.TaskSource + (*timestamppb.Timestamp)(nil), // 24: google.protobuf.Timestamp } var file_temporal_server_api_taskqueue_v1_message_proto_depIdxs = []int32{ - 14, // 0: temporal.server.api.taskqueue.v1.TaskVersionDirective.use_assignment_rules:type_name -> google.protobuf.Empty - 15, // 1: temporal.server.api.taskqueue.v1.TaskVersionDirective.behavior:type_name -> temporal.api.enums.v1.VersioningBehavior - 16, // 2: temporal.server.api.taskqueue.v1.TaskVersionDirective.deployment:type_name -> temporal.api.deployment.v1.Deployment - 17, // 3: temporal.server.api.taskqueue.v1.TaskVersionDirective.deployment_version:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersion + 15, // 0: temporal.server.api.taskqueue.v1.TaskVersionDirective.use_assignment_rules:type_name -> google.protobuf.Empty + 16, // 1: temporal.server.api.taskqueue.v1.TaskVersionDirective.behavior:type_name -> temporal.api.enums.v1.VersioningBehavior + 17, // 2: temporal.server.api.taskqueue.v1.TaskVersionDirective.deployment:type_name -> temporal.api.deployment.v1.Deployment + 18, // 3: temporal.server.api.taskqueue.v1.TaskVersionDirective.deployment_version:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersion 1, // 4: temporal.server.api.taskqueue.v1.InternalTaskQueueStatus.fair_read_level:type_name -> temporal.server.api.taskqueue.v1.FairLevel 1, // 5: temporal.server.api.taskqueue.v1.InternalTaskQueueStatus.fair_ack_level:type_name -> temporal.server.api.taskqueue.v1.FairLevel - 18, // 6: temporal.server.api.taskqueue.v1.InternalTaskQueueStatus.task_id_block:type_name -> temporal.api.taskqueue.v1.TaskIdBlock + 19, // 6: temporal.server.api.taskqueue.v1.InternalTaskQueueStatus.task_id_block:type_name -> temporal.api.taskqueue.v1.TaskIdBlock 1, // 7: temporal.server.api.taskqueue.v1.InternalTaskQueueStatus.fair_max_read_level:type_name -> temporal.server.api.taskqueue.v1.FairLevel 4, // 8: temporal.server.api.taskqueue.v1.TaskQueueVersionInfoInternal.physical_task_queue_info:type_name -> temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo - 19, // 9: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.pollers:type_name -> temporal.api.taskqueue.v1.PollerInfo + 20, // 9: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.pollers:type_name -> temporal.api.taskqueue.v1.PollerInfo 2, // 10: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.internal_task_queue_status:type_name -> temporal.server.api.taskqueue.v1.InternalTaskQueueStatus - 20, // 11: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.task_queue_stats:type_name -> temporal.api.taskqueue.v1.TaskQueueStats - 11, // 12: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.task_queue_stats_by_priority_key:type_name -> temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.TaskQueueStatsByPriorityKeyEntry - 21, // 13: temporal.server.api.taskqueue.v1.TaskQueuePartition.task_queue_type:type_name -> temporal.api.enums.v1.TaskQueueType - 22, // 14: temporal.server.api.taskqueue.v1.TaskForwardInfo.task_source:type_name -> temporal.server.api.enums.v1.TaskSource - 23, // 15: temporal.server.api.taskqueue.v1.TaskForwardInfo.create_time:type_name -> google.protobuf.Timestamp - 6, // 16: temporal.server.api.taskqueue.v1.TaskForwardInfo.redirect_info:type_name -> temporal.server.api.taskqueue.v1.BuildIdRedirectInfo - 13, // 17: temporal.server.api.taskqueue.v1.EphemeralData.partition:type_name -> temporal.server.api.taskqueue.v1.EphemeralData.ByPartition - 8, // 18: temporal.server.api.taskqueue.v1.VersionedEphemeralData.data:type_name -> temporal.server.api.taskqueue.v1.EphemeralData - 20, // 19: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.TaskQueueStatsByPriorityKeyEntry.value:type_name -> temporal.api.taskqueue.v1.TaskQueueStats - 17, // 20: temporal.server.api.taskqueue.v1.EphemeralData.ByVersion.version:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersion - 12, // 21: temporal.server.api.taskqueue.v1.EphemeralData.ByPartition.version:type_name -> temporal.server.api.taskqueue.v1.EphemeralData.ByVersion - 22, // [22:22] is the sub-list for method output_type - 22, // [22:22] is the sub-list for method input_type - 22, // [22:22] is the sub-list for extension type_name - 22, // [22:22] is the sub-list for extension extendee - 0, // [0:22] is the sub-list for field type_name + 21, // 11: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.task_queue_stats:type_name -> temporal.api.taskqueue.v1.TaskQueueStats + 12, // 12: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.task_queue_stats_by_priority_key:type_name -> temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.TaskQueueStatsByPriorityKeyEntry + 22, // 13: temporal.server.api.taskqueue.v1.TaskQueuePartition.task_queue_type:type_name -> temporal.api.enums.v1.TaskQueueType + 6, // 14: temporal.server.api.taskqueue.v1.TaskQueuePartition.worker_commands:type_name -> temporal.server.api.taskqueue.v1.WorkerCommandsPartitionId + 23, // 15: temporal.server.api.taskqueue.v1.TaskForwardInfo.task_source:type_name -> temporal.server.api.enums.v1.TaskSource + 24, // 16: temporal.server.api.taskqueue.v1.TaskForwardInfo.create_time:type_name -> google.protobuf.Timestamp + 7, // 17: temporal.server.api.taskqueue.v1.TaskForwardInfo.redirect_info:type_name -> temporal.server.api.taskqueue.v1.BuildIdRedirectInfo + 14, // 18: temporal.server.api.taskqueue.v1.EphemeralData.partition:type_name -> temporal.server.api.taskqueue.v1.EphemeralData.ByPartition + 9, // 19: temporal.server.api.taskqueue.v1.VersionedEphemeralData.data:type_name -> temporal.server.api.taskqueue.v1.EphemeralData + 21, // 20: temporal.server.api.taskqueue.v1.PhysicalTaskQueueInfo.TaskQueueStatsByPriorityKeyEntry.value:type_name -> temporal.api.taskqueue.v1.TaskQueueStats + 18, // 21: temporal.server.api.taskqueue.v1.EphemeralData.ByVersion.version:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersion + 13, // 22: temporal.server.api.taskqueue.v1.EphemeralData.ByPartition.version:type_name -> temporal.server.api.taskqueue.v1.EphemeralData.ByVersion + 23, // [23:23] is the sub-list for method output_type + 23, // [23:23] is the sub-list for method input_type + 23, // [23:23] is the sub-list for extension type_name + 23, // [23:23] is the sub-list for extension extendee + 0, // [0:23] is the sub-list for field type_name } func init() { file_temporal_server_api_taskqueue_v1_message_proto_init() } @@ -1158,6 +1215,7 @@ func file_temporal_server_api_taskqueue_v1_message_proto_init() { file_temporal_server_api_taskqueue_v1_message_proto_msgTypes[5].OneofWrappers = []any{ (*TaskQueuePartition_NormalPartitionId)(nil), (*TaskQueuePartition_StickyName)(nil), + (*TaskQueuePartition_WorkerCommands)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -1165,7 +1223,7 @@ func file_temporal_server_api_taskqueue_v1_message_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_temporal_server_api_taskqueue_v1_message_proto_rawDesc), len(file_temporal_server_api_taskqueue_v1_message_proto_rawDesc)), NumEnums: 0, - NumMessages: 14, + NumMessages: 15, NumExtensions: 0, NumServices: 0, }, diff --git a/common/metrics/task_queues.go b/common/metrics/task_queues.go index c624d23a3e2..07f1f2ec896 100644 --- a/common/metrics/task_queues.go +++ b/common/metrics/task_queues.go @@ -60,7 +60,7 @@ func GetPerTaskQueuePartitionIDScope( } // GetPerTaskQueuePartitionTypeScope returns GetPerTaskQueueScope scope plus a "partition" tag which -// can be "__normal__", "__sticky__", or "_unknown_". +// can be "__normal__", "__sticky__", "__worker_commands__", or "_unknown_". func GetPerTaskQueuePartitionTypeScope( handler Handler, namespaceName string, diff --git a/common/tqid/task_queue_id.go b/common/tqid/task_queue_id.go index f5582b86604..0754701df45 100644 --- a/common/tqid/task_queue_id.go +++ b/common/tqid/task_queue_id.go @@ -97,6 +97,13 @@ type ( taskQueue *TaskQueue } + // WorkerCommandsPartition is used for server-to-worker communication (e.g. activity cancellations). + // These queues are per-worker-process and only exist for the lifetime of the worker process. The SDK sets + // Kind=TASK_QUEUE_KIND_WORKER_COMMANDS when polling on these queues. + WorkerCommandsPartition struct { + taskQueue *TaskQueue + } + // PartitionKey uniquely identifies a task queue partition, to be used in maps. // Note that task queue kind (sticky vs normal) and normal name for sticky task queues are not // part of the task queue partition identity. @@ -110,6 +117,7 @@ type ( var _ Partition = (*NormalPartition)(nil) var _ Partition = (*StickyPartition)(nil) +var _ Partition = (*WorkerCommandsPartition)(nil) var ( ErrNoParent = errors.New("root task queue partition has no parent") @@ -143,11 +151,13 @@ func UnsafePartitionFromProto(proto *taskqueuepb.TaskQueue, namespaceId string, if err == nil { return p } - kind := proto.GetKind() - switch kind { //nolint:exhaustive + switch proto.GetKind() { //nolint:exhaustive case enumspb.TASK_QUEUE_KIND_STICKY: tq := &TaskQueue{TaskQueueFamily{namespaceId, proto.GetNormalName()}, taskType} return tq.StickyPartition(proto.GetName()) + case enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS: + tq := &TaskQueue{TaskQueueFamily{namespaceId, proto.GetName()}, taskType} + return tq.WorkerCommandsPartition() default: tq := &TaskQueue{TaskQueueFamily{namespaceId, proto.GetName()}, taskType} return tq.RootPartition() @@ -173,6 +183,15 @@ func PartitionFromProto(proto *taskqueuepb.TaskQueue, namespaceId string, taskTy } tq := &TaskQueue{TaskQueueFamily{namespaceId, normalName}, taskType} return tq.StickyPartition(baseName), nil + case enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS: + if partition != 0 { + return nil, serviceerror.NewInvalidArgumentf("worker-commands partitions cannot have non-zero partition ID. base name: %s", baseName) + } + if taskType != enumspb.TASK_QUEUE_TYPE_NEXUS { + return nil, serviceerror.NewInvalidArgumentf("worker-commands partitions must have nexus task type, got: %v. base name: %s", taskType, baseName) + } + tq := &TaskQueue{TaskQueueFamily{namespaceId, baseName}, taskType} + return tq.WorkerCommandsPartition(), nil default: tq := &TaskQueue{TaskQueueFamily{namespaceId, baseName}, taskType} return tq.NormalPartition(partition), nil @@ -184,6 +203,8 @@ func PartitionFromPartitionProto(proto *taskqueuespb.TaskQueuePartition, namespa switch proto.GetPartitionId().(type) { case *taskqueuespb.TaskQueuePartition_StickyName: return tq.StickyPartition(proto.GetStickyName()) + case *taskqueuespb.TaskQueuePartition_WorkerCommands: + return tq.WorkerCommandsPartition() default: return tq.NormalPartition(int(proto.GetNormalPartitionId())) } @@ -248,6 +269,10 @@ func (n *TaskQueue) StickyPartition(stickyName string) *StickyPartition { return &StickyPartition{stickyName, n} } +func (n *TaskQueue) WorkerCommandsPartition() *WorkerCommandsPartition { + return &WorkerCommandsPartition{n} +} + func (n *TaskQueue) RootPartition() *NormalPartition { return n.NormalPartition(0) } @@ -311,6 +336,57 @@ func (s *StickyPartition) GradualChangeKey() []byte { return []byte(key) } +func (w *WorkerCommandsPartition) TaskType() enumspb.TaskQueueType { + return w.taskQueue.TaskType() +} + +func (w *WorkerCommandsPartition) Kind() enumspb.TaskQueueKind { + return enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS +} + +func (w *WorkerCommandsPartition) NamespaceId() string { //nolint:stylecheck,staticcheck // matches Partition interface + return w.taskQueue.family.NamespaceId() +} + +func (w *WorkerCommandsPartition) TaskQueue() *TaskQueue { + return w.taskQueue +} + +func (w *WorkerCommandsPartition) IsRoot() bool { + return false +} + +func (w *WorkerCommandsPartition) IsChild() bool { + return false +} + +func (w *WorkerCommandsPartition) PersistenceTTL() time.Duration { return 24 * time.Hour } +func (w *WorkerCommandsPartition) SupportsFairness() bool { return false } +func (w *WorkerCommandsPartition) SupportsVersioning() bool { return false } +func (w *WorkerCommandsPartition) SupportsPartitions() bool { return false } +func (w *WorkerCommandsPartition) MetricTag(bool) string { return "__worker_commands__" } + +func (w *WorkerCommandsPartition) RpcName() string { //nolint:stylecheck,staticcheck // matches Partition interface + return w.taskQueue.Name() +} + +func (w *WorkerCommandsPartition) Key() PartitionKey { + return PartitionKey{ + namespaceId: w.NamespaceId(), + name: w.taskQueue.Name(), + taskType: w.TaskType(), + } +} + +func (w *WorkerCommandsPartition) RoutingKey(int) (string, int) { + return fmt.Sprintf("%s:%s:%d", w.NamespaceId(), w.RpcName(), w.TaskType()), 0 +} + +func (w *WorkerCommandsPartition) GradualChangeKey() []byte { + key := fmt.Sprintf("%s:%s:%d", w.NamespaceId(), w.RpcName(), w.TaskType()) + return []byte(key) +} + func (p *NormalPartition) TaskQueue() *TaskQueue { return p.taskQueue } diff --git a/common/tqid/task_queue_id_test.go b/common/tqid/task_queue_id_test.go index 5c8057151c4..487d799a34e 100644 --- a/common/tqid/task_queue_id_test.go +++ b/common/tqid/task_queue_id_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "strconv" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -56,6 +57,49 @@ func TestFromProtoPartition_Sticky(t *testing.T) { a.True(errors.Is(err, ErrNonZeroSticky)) } +func TestFromProtoPartition_WorkerCommands(t *testing.T) { + nsid := "my-namespace" + queueName := "/temporal-sys/worker-commands/ns/key" + taskType := enumspb.TASK_QUEUE_TYPE_NEXUS + kind := enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS + proto := &taskqueuepb.TaskQueue{ + Name: queueName, + Kind: kind, + } + + p, err := PartitionFromProto(proto, nsid, taskType) + require.NoError(t, err) + require.Equal(t, nsid, p.NamespaceId()) + require.Equal(t, taskType, p.TaskType()) + require.Equal(t, kind, p.Kind()) + require.Equal(t, queueName, p.TaskQueue().Name()) + require.Equal(t, queueName, p.RpcName()) + require.False(t, p.IsRoot()) + require.False(t, p.IsChild()) + require.Equal(t, PartitionKey{nsid, queueName, 0, taskType}, p.Key()) + + // worker-commands cannot have non-zero partition + proto.Name = "/_sys/" + queueName + "/1" + _, err = PartitionFromProto(proto, nsid, taskType) + require.Error(t, err) + + // worker-commands must have nexus task type + proto.Name = queueName + _, err = PartitionFromProto(proto, nsid, enumspb.TASK_QUEUE_TYPE_WORKFLOW) + require.Error(t, err) +} + +func TestWorkerCommandsPartitionProperties(t *testing.T) { + tq := UnsafeTaskQueueFamily("ns", "wc-queue").TaskQueue(enumspb.TASK_QUEUE_TYPE_NEXUS) + p := tq.WorkerCommandsPartition() + + require.Equal(t, 24*time.Hour, p.PersistenceTTL()) + require.False(t, p.SupportsFairness()) + require.False(t, p.SupportsVersioning()) + require.False(t, p.SupportsPartitions()) + require.Equal(t, "__worker_commands__", p.MetricTag(false)) +} + func TestFromProtoPartition_Normal(t *testing.T) { a := assert.New(t) diff --git a/proto/internal/temporal/server/api/taskqueue/v1/message.proto b/proto/internal/temporal/server/api/taskqueue/v1/message.proto index 98a05fabb5e..ac6ebac7673 100644 --- a/proto/internal/temporal/server/api/taskqueue/v1/message.proto +++ b/proto/internal/temporal/server/api/taskqueue/v1/message.proto @@ -92,7 +92,8 @@ message PhysicalTaskQueueInfo { map task_queue_stats_by_priority_key = 4; } -// Represents a normal or sticky partition of a task queue. +// Internal representation of a task queue partition, used for server-to-server RPCs. +// This is the internal equivalent of temporal.api.taskqueue.v1.TaskQueue. message TaskQueuePartition { // This is the user-facing name for this task queue string task_queue = 1; @@ -101,9 +102,12 @@ message TaskQueuePartition { oneof partition_id { int32 normal_partition_id = 3; string sticky_name = 4; + WorkerCommandsPartitionId worker_commands = 5; } } +message WorkerCommandsPartitionId {} + // Information about redirect intention sent by Matching to History in Record*TaskStarted calls. // Deprecated. message BuildIdRedirectInfo { diff --git a/service/matching/matching_engine_test.go b/service/matching/matching_engine_test.go index 612797e1c0c..ceb2386168a 100644 --- a/service/matching/matching_engine_test.go +++ b/service/matching/matching_engine_test.go @@ -5232,6 +5232,128 @@ func (*testTaskManager) CountTaskQueuesByBuildId(context.Context, *persistence.C return 0, nil } +// TestLoggerAndMetricsForPartition_BreakdownEnabled verifies the taskqueue and partition metric +// tags for each task queue kind with the default BreakdownMetricsByTaskQueue=true. +func TestLoggerAndMetricsForPartition_BreakdownEnabled(t *testing.T) { + t.Parallel() + + controller := gomock.NewController(t) + ns, mockNamespaceCache := createMockNamespaceCache(controller, matchingTestNamespace) + config := defaultTestConfig() + e := createTestMatchingEngine(log.NewTestLogger(), controller, config, nil, mockNamespaceCache) + captureHandler := metricstest.NewCaptureHandler() + e.metricsHandler = captureHandler + + tests := []struct { + name string + partition tqid.Partition + expectTQValue string + expectPartitionTag string + }{ + { + name: "normal", + partition: newRootPartition(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_NEXUS), + expectTQValue: "my-task-queue", + expectPartitionTag: "0", + }, + { + name: "worker-commands", + partition: newTestTaskQueue(ns.ID().String(), "/temporal-sys/worker-commands/ns/key", enumspb.TASK_QUEUE_TYPE_NEXUS).WorkerCommandsPartition(), + expectTQValue: "/temporal-sys/worker-commands/ns/key", + expectPartitionTag: "__worker_commands__", + }, + { + name: "sticky", + partition: newTestTaskQueue(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_WORKFLOW).StickyPartition(uuid.NewString()), + expectTQValue: "my-task-queue", + expectPartitionTag: "__sticky__", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + capture := captureHandler.StartCapture() + tqConfig := newTaskQueueConfig(tc.partition.TaskQueue(), config, matchingTestNamespace) + _, _, handler := e.loggerAndMetricsForPartition(ns, tc.partition, tqConfig) + metrics.PollSuccessPerTaskQueueCounter.With(handler).Record(1) + snap := capture.Snapshot() + captureHandler.StopCapture(capture) + recordings := snap["poll_success"] + require.NotEmpty(t, recordings, "expected poll_success metric to be recorded") + found := false + for _, rec := range recordings { + if rec.Tags["taskqueue"] == tc.expectTQValue && rec.Tags["partition"] == tc.expectPartitionTag { + found = true + } + } + require.True(t, found, "expected taskqueue=%q partition=%q, got: %v", tc.expectTQValue, tc.expectPartitionTag, recordings) + }) + } +} + +// TestLoggerAndMetricsForPartition_BreakdownDisabled verifies the taskqueue and partition metric +// tags for each task queue kind with BreakdownMetricsByTaskQueue=false. +func TestLoggerAndMetricsForPartition_BreakdownDisabled(t *testing.T) { + t.Parallel() + + controller := gomock.NewController(t) + ns, mockNamespaceCache := createMockNamespaceCache(controller, matchingTestNamespace) + dc := dynamicconfig.StaticClient{ + dynamicconfig.MetricsBreakdownByTaskQueue.Key(): false, + } + config := NewConfig(dynamicconfig.NewCollection(dc, log.NewNoopLogger())) + config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueue(100 * time.Millisecond) + e := createTestMatchingEngine(log.NewTestLogger(), controller, config, nil, mockNamespaceCache) + captureHandler := metricstest.NewCaptureHandler() + e.metricsHandler = captureHandler + + tests := []struct { + name string + partition tqid.Partition + expectTQValue string + expectPartitionTag string + }{ + { + name: "normal", + partition: newRootPartition(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_NEXUS), + expectTQValue: "__omitted__", + expectPartitionTag: "0", + }, + { + name: "sticky", + partition: newTestTaskQueue(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_WORKFLOW).StickyPartition(uuid.NewString()), + expectTQValue: "__omitted__", + expectPartitionTag: "__sticky__", + }, + { + name: "worker-commands", + partition: newTestTaskQueue(ns.ID().String(), "/temporal-sys/worker-commands/ns/key", enumspb.TASK_QUEUE_TYPE_NEXUS).WorkerCommandsPartition(), + expectTQValue: "__omitted__", + expectPartitionTag: "__worker_commands__", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + capture := captureHandler.StartCapture() + tqConfig := newTaskQueueConfig(tc.partition.TaskQueue(), config, matchingTestNamespace) + _, _, handler := e.loggerAndMetricsForPartition(ns, tc.partition, tqConfig) + metrics.PollSuccessPerTaskQueueCounter.With(handler).Record(1) + snap := capture.Snapshot() + captureHandler.StopCapture(capture) + recordings := snap["poll_success"] + require.NotEmpty(t, recordings, "expected poll_success metric to be recorded") + found := false + for _, rec := range recordings { + if rec.Tags["taskqueue"] == tc.expectTQValue && rec.Tags["partition"] == tc.expectPartitionTag { + found = true + } + } + require.True(t, found, "expected taskqueue=%q partition=%q, got: %v", tc.expectTQValue, tc.expectPartitionTag, recordings) + }) + } +} + func TestConvertPollWorkflowTaskQueueResponse(t *testing.T) { t.Parallel() diff --git a/service/matching/physical_task_queue_key.go b/service/matching/physical_task_queue_key.go index c7dde2eafc6..bbe81c7468c 100644 --- a/service/matching/physical_task_queue_key.go +++ b/service/matching/physical_task_queue_key.go @@ -117,6 +117,8 @@ func (q *PhysicalTaskQueueKey) PersistenceName() string { switch p := q.Partition().(type) { case *tqid.StickyPartition: return p.StickyName() + case *tqid.WorkerCommandsPartition: + return p.TaskQueue().Name() case *tqid.NormalPartition: baseName := q.TaskQueueFamily().Name() diff --git a/service/matching/physical_task_queue_key_test.go b/service/matching/physical_task_queue_key_test.go index ef5775703a5..c0d6af44812 100644 --- a/service/matching/physical_task_queue_key_test.go +++ b/service/matching/physical_task_queue_key_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/server/common/tqid" @@ -64,3 +65,10 @@ func TestUnversionedQueueKey(t *testing.T) { a.Equal("", key.Version().BuildId()) a.Nil(key.Version().Deployment()) } + +func TestWorkerCommandsPersistenceName(t *testing.T) { + tq := tqid.UnsafeTaskQueueFamily("ns", "/temporal-sys/worker-commands/ns/key").TaskQueue(enumspb.TASK_QUEUE_TYPE_NEXUS) + p := tq.WorkerCommandsPartition() + key := UnversionedQueueKey(p) + require.Equal(t, "/temporal-sys/worker-commands/ns/key", key.PersistenceName()) +} diff --git a/tests/worker_commands_task_test.go b/tests/worker_commands_task_test.go index ba0cfda7324..187062797b5 100644 --- a/tests/worker_commands_task_test.go +++ b/tests/worker_commands_task_test.go @@ -129,7 +129,7 @@ func TestDispatchCancelToWorker(t *testing.T) { defer pollCancel() resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ Namespace: env.Namespace().String(), - TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, Identity: tv.WorkerIdentity(), }) if err == nil && resp != nil && resp.Request != nil {