Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
884103a
Add IsEphemeral() to Partition interface for ephemeral task queue checks
rkannan82 Apr 10, 2026
056da25
Merge branch 'main' into kannan/ephemeral-partition-property
rkannan82 Apr 10, 2026
8e2ff4f
Rename stickyTaskQueueTTL and stickyWorkerAvailable to ephemeral vari…
rkannan82 Apr 10, 2026
2a7094c
Replace IsEphemeral() with fine-grained partition properties
rkannan82 Apr 14, 2026
5486579
Replace HasTTLExpiry with PersistenceTTL and use explicit sticky chec…
rkannan82 Apr 15, 2026
b3d1df6
Add MetricTag() to Partition interface
rkannan82 Apr 15, 2026
a212a6e
Add WorkerCommandsPartition type and partition metrics tag
rkannan82 Apr 10, 2026
28cb3fc
Fix stale test comments
rkannan82 Apr 10, 2026
ca8c453
Clean up test names and comments for consistency
rkannan82 Apr 10, 2026
285da50
Bump go.temporal.io/api to v1.62.9 and fix comment
rkannan82 Apr 15, 2026
a57d54a
Fix lint: nolint for interface method names, require instead of assert
rkannan82 Apr 15, 2026
eb69e07
Remove taskQueueBreakdown override for worker-commands and fix lint
rkannan82 Apr 15, 2026
0cbdeae
Add callback proto to getproto files.go
rkannan82 Apr 15, 2026
f65cc2d
Remove redundant name field from WorkerCommandsPartition and add tests
rkannan82 Apr 15, 2026
13c2a92
Merge remote-tracking branch 'origin/main' into kannan/worker-command…
rkannan82 Apr 15, 2026
691e44d
Fix lint: use require instead of assert in worker-commands tests
rkannan82 Apr 15, 2026
c899cf0
Merge branch 'main' into kannan/worker-commands-partition-metrics
rkannan82 Apr 15, 2026
06159c1
Validate worker-commands partitions must have nexus task type
rkannan82 Apr 15, 2026
1254c02
Fix worker-commands metric test to use correct task queue name
rkannan82 Apr 16, 2026
472aa8a
Update common/tqid/task_queue_id.go
rkannan82 Apr 17, 2026
2f1fed1
Add TODO for worker-commands variant in TaskQueuePartition proto
rkannan82 Apr 17, 2026
f0c646d
Handle WorkerCommandsPartition in PersistenceName
rkannan82 Apr 17, 2026
4055006
Add test for WorkerCommandsPartition PersistenceName and fix unused var
rkannan82 Apr 17, 2026
5c107f2
Bump go.temporal.io/api to match main
rkannan82 Apr 17, 2026
b5ef8f0
Merge remote-tracking branch 'origin/main' into kannan/worker-command…
rkannan82 Apr 17, 2026
fc8b3dc
Use TASK_QUEUE_KIND_WORKER_COMMANDS in e2e test
rkannan82 Apr 17, 2026
34b0229
Add worker-commands variant to TaskQueuePartition proto
rkannan82 Apr 18, 2026
b6f6282
Merge remote-tracking branch 'origin/main' into kannan/worker-command…
rkannan82 Apr 18, 2026
1b1db84
Merge branch 'main' into kannan/worker-commands-partition-metrics
rkannan82 Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions api/taskqueue/v1/message.go-helpers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

192 changes: 125 additions & 67 deletions api/taskqueue/v1/message.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/metrics/task_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func GetPerTaskQueuePartitionIDScope(
}

// GetPerTaskQueuePartitionTypeScope returns GetPerTaskQueueScope scope plus a "partition" tag which
// can be "__normal__", "__sticky__", or "_unknown_".
// can be "__normal__", "__sticky__", "__worker_commands__", or "_unknown_".
func GetPerTaskQueuePartitionTypeScope(
handler Handler,
namespaceName string,
Expand Down
80 changes: 78 additions & 2 deletions common/tqid/task_queue_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ type (
taskQueue *TaskQueue
}

// WorkerCommandsPartition is used for server-to-worker communication (e.g. activity cancellations).
// These queues are per-worker-process and only exist for the lifetime of the worker process. The SDK sets
// Kind=TASK_QUEUE_KIND_WORKER_COMMANDS when polling on these queues.
WorkerCommandsPartition struct {
taskQueue *TaskQueue
Comment thread
rkannan82 marked this conversation as resolved.
}

// PartitionKey uniquely identifies a task queue partition, to be used in maps.
// Note that task queue kind (sticky vs normal) and normal name for sticky task queues are not
// part of the task queue partition identity.
Expand All @@ -110,6 +117,7 @@ type (

var _ Partition = (*NormalPartition)(nil)
var _ Partition = (*StickyPartition)(nil)
var _ Partition = (*WorkerCommandsPartition)(nil)

var (
ErrNoParent = errors.New("root task queue partition has no parent")
Expand Down Expand Up @@ -143,11 +151,13 @@ func UnsafePartitionFromProto(proto *taskqueuepb.TaskQueue, namespaceId string,
if err == nil {
return p
}
kind := proto.GetKind()
switch kind { //nolint:exhaustive
switch proto.GetKind() { //nolint:exhaustive
case enumspb.TASK_QUEUE_KIND_STICKY:
tq := &TaskQueue{TaskQueueFamily{namespaceId, proto.GetNormalName()}, taskType}
return tq.StickyPartition(proto.GetName())
case enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS:
tq := &TaskQueue{TaskQueueFamily{namespaceId, proto.GetName()}, taskType}
return tq.WorkerCommandsPartition()
default:
tq := &TaskQueue{TaskQueueFamily{namespaceId, proto.GetName()}, taskType}
return tq.RootPartition()
Expand All @@ -173,6 +183,15 @@ func PartitionFromProto(proto *taskqueuepb.TaskQueue, namespaceId string, taskTy
}
tq := &TaskQueue{TaskQueueFamily{namespaceId, normalName}, taskType}
return tq.StickyPartition(baseName), nil
case enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS:
Comment thread
rkannan82 marked this conversation as resolved.
if partition != 0 {
return nil, serviceerror.NewInvalidArgumentf("worker-commands partitions cannot have non-zero partition ID. base name: %s", baseName)
}
if taskType != enumspb.TASK_QUEUE_TYPE_NEXUS {
return nil, serviceerror.NewInvalidArgumentf("worker-commands partitions must have nexus task type, got: %v. base name: %s", taskType, baseName)
}
tq := &TaskQueue{TaskQueueFamily{namespaceId, baseName}, taskType}
Comment thread
rkannan82 marked this conversation as resolved.
return tq.WorkerCommandsPartition(), nil
default:
tq := &TaskQueue{TaskQueueFamily{namespaceId, baseName}, taskType}
return tq.NormalPartition(partition), nil
Expand All @@ -184,6 +203,8 @@ func PartitionFromPartitionProto(proto *taskqueuespb.TaskQueuePartition, namespa
switch proto.GetPartitionId().(type) {
case *taskqueuespb.TaskQueuePartition_StickyName:
return tq.StickyPartition(proto.GetStickyName())
case *taskqueuespb.TaskQueuePartition_WorkerCommands:
return tq.WorkerCommandsPartition()
default:
return tq.NormalPartition(int(proto.GetNormalPartitionId()))
}
Expand Down Expand Up @@ -248,6 +269,10 @@ func (n *TaskQueue) StickyPartition(stickyName string) *StickyPartition {
return &StickyPartition{stickyName, n}
}

func (n *TaskQueue) WorkerCommandsPartition() *WorkerCommandsPartition {
return &WorkerCommandsPartition{n}
}

func (n *TaskQueue) RootPartition() *NormalPartition {
return n.NormalPartition(0)
}
Expand Down Expand Up @@ -311,6 +336,57 @@ func (s *StickyPartition) GradualChangeKey() []byte {
return []byte(key)
}

func (w *WorkerCommandsPartition) TaskType() enumspb.TaskQueueType {
return w.taskQueue.TaskType()
}

func (w *WorkerCommandsPartition) Kind() enumspb.TaskQueueKind {
return enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS
}

func (w *WorkerCommandsPartition) NamespaceId() string { //nolint:stylecheck,staticcheck // matches Partition interface
return w.taskQueue.family.NamespaceId()
}

func (w *WorkerCommandsPartition) TaskQueue() *TaskQueue {
return w.taskQueue
}

func (w *WorkerCommandsPartition) IsRoot() bool {
return false
}

func (w *WorkerCommandsPartition) IsChild() bool {
return false
}

func (w *WorkerCommandsPartition) PersistenceTTL() time.Duration { return 24 * time.Hour }
func (w *WorkerCommandsPartition) SupportsFairness() bool { return false }
func (w *WorkerCommandsPartition) SupportsVersioning() bool { return false }
func (w *WorkerCommandsPartition) SupportsPartitions() bool { return false }
func (w *WorkerCommandsPartition) MetricTag(bool) string { return "__worker_commands__" }

func (w *WorkerCommandsPartition) RpcName() string { //nolint:stylecheck,staticcheck // matches Partition interface
return w.taskQueue.Name()
}

func (w *WorkerCommandsPartition) Key() PartitionKey {
return PartitionKey{
namespaceId: w.NamespaceId(),
name: w.taskQueue.Name(),
taskType: w.TaskType(),
}
}

func (w *WorkerCommandsPartition) RoutingKey(int) (string, int) {
return fmt.Sprintf("%s:%s:%d", w.NamespaceId(), w.RpcName(), w.TaskType()), 0
}

func (w *WorkerCommandsPartition) GradualChangeKey() []byte {
key := fmt.Sprintf("%s:%s:%d", w.NamespaceId(), w.RpcName(), w.TaskType())
return []byte(key)
}

func (p *NormalPartition) TaskQueue() *TaskQueue {
return p.taskQueue
}
Expand Down
44 changes: 44 additions & 0 deletions common/tqid/task_queue_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -56,6 +57,49 @@ func TestFromProtoPartition_Sticky(t *testing.T) {
a.True(errors.Is(err, ErrNonZeroSticky))
}

func TestFromProtoPartition_WorkerCommands(t *testing.T) {
nsid := "my-namespace"
queueName := "/temporal-sys/worker-commands/ns/key"
taskType := enumspb.TASK_QUEUE_TYPE_NEXUS
kind := enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS
proto := &taskqueuepb.TaskQueue{
Name: queueName,
Kind: kind,
}

p, err := PartitionFromProto(proto, nsid, taskType)
require.NoError(t, err)
require.Equal(t, nsid, p.NamespaceId())
require.Equal(t, taskType, p.TaskType())
require.Equal(t, kind, p.Kind())
require.Equal(t, queueName, p.TaskQueue().Name())
require.Equal(t, queueName, p.RpcName())
require.False(t, p.IsRoot())
require.False(t, p.IsChild())
require.Equal(t, PartitionKey{nsid, queueName, 0, taskType}, p.Key())

// worker-commands cannot have non-zero partition
proto.Name = "/_sys/" + queueName + "/1"
_, err = PartitionFromProto(proto, nsid, taskType)
require.Error(t, err)

// worker-commands must have nexus task type
proto.Name = queueName
_, err = PartitionFromProto(proto, nsid, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
require.Error(t, err)
}

func TestWorkerCommandsPartitionProperties(t *testing.T) {
tq := UnsafeTaskQueueFamily("ns", "wc-queue").TaskQueue(enumspb.TASK_QUEUE_TYPE_NEXUS)
p := tq.WorkerCommandsPartition()

require.Equal(t, 24*time.Hour, p.PersistenceTTL())
require.False(t, p.SupportsFairness())
require.False(t, p.SupportsVersioning())
require.False(t, p.SupportsPartitions())
require.Equal(t, "__worker_commands__", p.MetricTag(false))
}

func TestFromProtoPartition_Normal(t *testing.T) {
a := assert.New(t)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ message PhysicalTaskQueueInfo {
map<int32, temporal.api.taskqueue.v1.TaskQueueStats> task_queue_stats_by_priority_key = 4;
}

// Represents a normal or sticky partition of a task queue.
// Internal representation of a task queue partition, used for server-to-server RPCs.
// This is the internal equivalent of temporal.api.taskqueue.v1.TaskQueue.
message TaskQueuePartition {
// This is the user-facing name for this task queue
string task_queue = 1;
Expand All @@ -101,9 +102,12 @@ message TaskQueuePartition {
oneof partition_id {
int32 normal_partition_id = 3;
string sticky_name = 4;
WorkerCommandsPartitionId worker_commands = 5;
Comment thread
rkannan82 marked this conversation as resolved.
}
}

message WorkerCommandsPartitionId {}

// Information about redirect intention sent by Matching to History in Record*TaskStarted calls.
// Deprecated.
message BuildIdRedirectInfo {
Expand Down
122 changes: 122 additions & 0 deletions service/matching/matching_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5232,6 +5232,128 @@ func (*testTaskManager) CountTaskQueuesByBuildId(context.Context, *persistence.C
return 0, nil
}

// TestLoggerAndMetricsForPartition_BreakdownEnabled verifies the taskqueue and partition metric
// tags for each task queue kind with the default BreakdownMetricsByTaskQueue=true.
func TestLoggerAndMetricsForPartition_BreakdownEnabled(t *testing.T) {
t.Parallel()

controller := gomock.NewController(t)
ns, mockNamespaceCache := createMockNamespaceCache(controller, matchingTestNamespace)
config := defaultTestConfig()
e := createTestMatchingEngine(log.NewTestLogger(), controller, config, nil, mockNamespaceCache)
captureHandler := metricstest.NewCaptureHandler()
e.metricsHandler = captureHandler

tests := []struct {
name string
partition tqid.Partition
expectTQValue string
expectPartitionTag string
}{
{
name: "normal",
partition: newRootPartition(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_NEXUS),
expectTQValue: "my-task-queue",
expectPartitionTag: "0",
},
{
name: "worker-commands",
partition: newTestTaskQueue(ns.ID().String(), "/temporal-sys/worker-commands/ns/key", enumspb.TASK_QUEUE_TYPE_NEXUS).WorkerCommandsPartition(),
expectTQValue: "/temporal-sys/worker-commands/ns/key",
expectPartitionTag: "__worker_commands__",
},
{
name: "sticky",
partition: newTestTaskQueue(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_WORKFLOW).StickyPartition(uuid.NewString()),
expectTQValue: "my-task-queue",
expectPartitionTag: "__sticky__",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
capture := captureHandler.StartCapture()
tqConfig := newTaskQueueConfig(tc.partition.TaskQueue(), config, matchingTestNamespace)
_, _, handler := e.loggerAndMetricsForPartition(ns, tc.partition, tqConfig)
metrics.PollSuccessPerTaskQueueCounter.With(handler).Record(1)
snap := capture.Snapshot()
captureHandler.StopCapture(capture)
recordings := snap["poll_success"]
require.NotEmpty(t, recordings, "expected poll_success metric to be recorded")
found := false
for _, rec := range recordings {
if rec.Tags["taskqueue"] == tc.expectTQValue && rec.Tags["partition"] == tc.expectPartitionTag {
found = true
}
}
require.True(t, found, "expected taskqueue=%q partition=%q, got: %v", tc.expectTQValue, tc.expectPartitionTag, recordings)
})
}
}

// TestLoggerAndMetricsForPartition_BreakdownDisabled verifies the taskqueue and partition metric
// tags for each task queue kind with BreakdownMetricsByTaskQueue=false.
func TestLoggerAndMetricsForPartition_BreakdownDisabled(t *testing.T) {
t.Parallel()

controller := gomock.NewController(t)
ns, mockNamespaceCache := createMockNamespaceCache(controller, matchingTestNamespace)
dc := dynamicconfig.StaticClient{
dynamicconfig.MetricsBreakdownByTaskQueue.Key(): false,
}
config := NewConfig(dynamicconfig.NewCollection(dc, log.NewNoopLogger()))
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueue(100 * time.Millisecond)
e := createTestMatchingEngine(log.NewTestLogger(), controller, config, nil, mockNamespaceCache)
captureHandler := metricstest.NewCaptureHandler()
e.metricsHandler = captureHandler

tests := []struct {
name string
partition tqid.Partition
expectTQValue string
expectPartitionTag string
}{
{
name: "normal",
partition: newRootPartition(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_NEXUS),
expectTQValue: "__omitted__",
expectPartitionTag: "0",
},
{
name: "sticky",
partition: newTestTaskQueue(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_WORKFLOW).StickyPartition(uuid.NewString()),
expectTQValue: "__omitted__",
expectPartitionTag: "__sticky__",
},
{
name: "worker-commands",
partition: newTestTaskQueue(ns.ID().String(), "/temporal-sys/worker-commands/ns/key", enumspb.TASK_QUEUE_TYPE_NEXUS).WorkerCommandsPartition(),
expectTQValue: "__omitted__",
expectPartitionTag: "__worker_commands__",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
capture := captureHandler.StartCapture()
tqConfig := newTaskQueueConfig(tc.partition.TaskQueue(), config, matchingTestNamespace)
_, _, handler := e.loggerAndMetricsForPartition(ns, tc.partition, tqConfig)
metrics.PollSuccessPerTaskQueueCounter.With(handler).Record(1)
snap := capture.Snapshot()
captureHandler.StopCapture(capture)
recordings := snap["poll_success"]
require.NotEmpty(t, recordings, "expected poll_success metric to be recorded")
found := false
for _, rec := range recordings {
if rec.Tags["taskqueue"] == tc.expectTQValue && rec.Tags["partition"] == tc.expectPartitionTag {
found = true
}
}
require.True(t, found, "expected taskqueue=%q partition=%q, got: %v", tc.expectTQValue, tc.expectPartitionTag, recordings)
})
}
}

func TestConvertPollWorkflowTaskQueueResponse(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading