Skip to content

Commit 1ae9010

Browse files
rkannan82claude
andcommitted
Use principal to classify system workers with task queue prefix fallback
When a principal is available from the gRPC context, use its type to determine if a worker is a system worker (type="temporal"). When principal is not available (propagation disabled), fall back to the existing task queue prefix check. Also fixes isSystemWorkerWorker typo from previous rename. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8ef1141 commit 1ae9010

5 files changed

Lines changed: 114 additions & 53 deletions

File tree

service/matching/handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -596,12 +596,13 @@ func (h *Handler) ListNexusEndpoints(ctx context.Context, request *matchingservi
596596

597597
// RecordWorkerHeartbeat receive heartbeat request from the worker.
598598
func (h *Handler) RecordWorkerHeartbeat(
599-
_ context.Context, request *matchingservice.RecordWorkerHeartbeatRequest,
599+
ctx context.Context, request *matchingservice.RecordWorkerHeartbeatRequest,
600600
) (*matchingservice.RecordWorkerHeartbeatResponse, error) {
601601
nsID := namespace.ID(request.GetNamespaceId())
602602
nsName := h.namespaceName(nsID)
603+
principal := headers.GetPrincipal(ctx)
603604

604-
h.workersRegistry.RecordWorkerHeartbeats(nsID, nsName, request.GetHeartbeartRequest().GetWorkerHeartbeat())
605+
h.workersRegistry.RecordWorkerHeartbeats(nsID, nsName, principal, request.GetHeartbeartRequest().GetWorkerHeartbeat())
605606
return &matchingservice.RecordWorkerHeartbeatResponse{}, nil
606607
}
607608

service/matching/workers/registry.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package workers
22

33
import (
4+
commonpb "go.temporal.io/api/common/v1"
45
workerpb "go.temporal.io/api/worker/v1"
56
"go.temporal.io/server/common/namespace"
67
)
@@ -21,7 +22,7 @@ type (
2122
}
2223

2324
Registry interface {
24-
RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, workerHeartbeat []*workerpb.WorkerHeartbeat)
25+
RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, principal *commonpb.Principal, workerHeartbeat []*workerpb.WorkerHeartbeat)
2526
ListWorkers(nsID namespace.ID, params ListWorkersParams) (ListWorkersResponse, error)
2627
DescribeWorker(nsID namespace.ID, workerInstanceKey string) (*workerpb.WorkerHeartbeat, error)
2728
}

service/matching/workers/registry_impl.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync/atomic"
1111
"time"
1212

13+
commonpb "go.temporal.io/api/common/v1"
1314
enumspb "go.temporal.io/api/enums/v1"
1415
"go.temporal.io/api/serviceerror"
1516
workerpb "go.temporal.io/api/worker/v1"
@@ -82,7 +83,7 @@ func newBucket() *bucket {
8283
// upsertHeartbeats inserts or refreshes a WorkerHeartbeat under the given namespace.
8384
// Returns the count of added and removed entries separately.
8485
// Workers with WORKER_STATUS_SHUTDOWN are immediately removed from the registry.
85-
func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.WorkerHeartbeat) (added int64, removed int64) {
86+
func (b *bucket) upsertHeartbeats(nsID namespace.ID, principal *commonpb.Principal, heartbeats []*workerpb.WorkerHeartbeat) (added int64, removed int64) {
8687
now := time.Now()
8788

8889
b.mu.Lock()
@@ -107,7 +108,7 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
107108
continue
108109
}
109110

110-
isSystemWorker := primitives.IsInternalTaskQueue(hb.GetTaskQueue())
111+
isSystemWorker := isSystemPrincipal(principal, hb.GetTaskQueue())
111112

112113
// Normal upsert
113114
if e, exists := mp[key]; exists {
@@ -257,9 +258,9 @@ func (m *registryImpl) getBucket(nsID namespace.ID) *bucket {
257258

258259
// upsertHeartbeat records or refreshes a WorkerHeartbeat under the given namespace.
259260
// New entries increment the global counter.
260-
func (m *registryImpl) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.WorkerHeartbeat) {
261+
func (m *registryImpl) upsertHeartbeats(nsID namespace.ID, principal *commonpb.Principal, heartbeats []*workerpb.WorkerHeartbeat) {
261262
b := m.getBucket(nsID)
262-
added, removed := b.upsertHeartbeats(nsID, heartbeats)
263+
added, removed := b.upsertHeartbeats(nsID, principal, heartbeats)
263264
m.total.Add(added - removed)
264265
if added > 0 {
265266
metrics.WorkerRegistryWorkersAdded.With(m.metricsHandler).Record(added)
@@ -375,8 +376,8 @@ func (m *registryImpl) Stop() {
375376
close(m.quit)
376377
}
377378

378-
func (m *registryImpl) RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, workerHeartbeat []*workerpb.WorkerHeartbeat) {
379-
m.upsertHeartbeats(nsID, workerHeartbeat)
379+
func (m *registryImpl) RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, principal *commonpb.Principal, workerHeartbeat []*workerpb.WorkerHeartbeat) {
380+
m.upsertHeartbeats(nsID, principal, workerHeartbeat)
380381
m.metricsEmitter.emit(nsID, nsName, workerHeartbeat)
381382
}
382383

@@ -477,3 +478,14 @@ func (m *registryImpl) DescribeWorker(nsID namespace.ID, workerInstanceKey strin
477478
}
478479
return b.getWorkerHeartbeat(nsID, workerInstanceKey)
479480
}
481+
482+
// isSystemPrincipal determines if a worker is a system worker.
483+
// If a principal is available, it checks whether the principal type indicates
484+
// an internal server identity. Otherwise, it falls back to checking the task
485+
// queue name prefix.
486+
func isSystemPrincipal(principal *commonpb.Principal, taskQueue string) bool {
487+
if principal != nil {
488+
return principal.GetType() == "temporal"
489+
}
490+
return primitives.IsInternalTaskQueue(taskQueue)
491+
}

service/matching/workers/registry_impl_test.go

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
12+
commonpb "go.temporal.io/api/common/v1"
1213
enumspb "go.temporal.io/api/enums/v1"
1314
workerpb "go.temporal.io/api/worker/v1"
1415
"go.temporal.io/server/common/dynamicconfig"
@@ -47,7 +48,7 @@ func TestUpdateAndListNamespace(t *testing.T) {
4748
// Add some heartbeats
4849
hb1 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerA", Status: enumspb.WORKER_STATUS_RUNNING}
4950
hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerB", Status: enumspb.WORKER_STATUS_RUNNING}
50-
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})
51+
m.upsertHeartbeats("ns1", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb1, hb2})
5152

5253
list = m.filterWorkers("ns1", true /*includeSystemWorkers*/,alwaysTrue)
5354
// Order is not guaranteed; check contents by keys
@@ -92,7 +93,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
9293
// Add two running workers
9394
hb1 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "worker1", Status: enumspb.WORKER_STATUS_RUNNING}
9495
hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "worker2", Status: enumspb.WORKER_STATUS_RUNNING}
95-
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})
96+
m.upsertHeartbeats("ns1", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb1, hb2})
9697

9798
// Verify both workers are registered
9899
list := m.filterWorkers("ns1", true /*includeSystemWorkers*/,alwaysTrue)
@@ -101,7 +102,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
101102

102103
// Worker1 sends shutdown status
103104
hbShutdown := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "worker1", Status: enumspb.WORKER_STATUS_SHUTDOWN}
104-
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hbShutdown})
105+
m.upsertHeartbeats("ns1", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hbShutdown})
105106

106107
// Verify only worker1 is removed, worker2 remains
107108
list = m.filterWorkers("ns1", true /*includeSystemWorkers*/,alwaysTrue)
@@ -139,7 +140,7 @@ func TestShutdownStatusForNonExistentWorker(t *testing.T) {
139140

140141
// Send shutdown for non-existent worker - should be a no-op
141142
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "unknown", Status: enumspb.WORKER_STATUS_SHUTDOWN}
142-
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb})
143+
m.upsertHeartbeats("ns1", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb})
143144

144145
// Verify nothing happened
145146
list := m.filterWorkers("ns1", true /*includeSystemWorkers*/,alwaysTrue)
@@ -165,7 +166,7 @@ func TestListNamespacePredicate(t *testing.T) {
165166
for i := 1; i <= 5; i++ {
166167
key := fmt.Sprintf("key%d", i)
167168
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key, CurrentStickyCacheSize: int32(i)}
168-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
169+
m.upsertHeartbeats("ns", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb})
169170
}
170171

171172
// Table-driven tests for predicates
@@ -191,7 +192,7 @@ func TestFilterWorkersExcludesSystemWorkers(t *testing.T) {
191192
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
192193
defer m.Stop()
193194

194-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{
195+
m.upsertHeartbeats("ns", nil /*principal*/,[]*workerpb.WorkerHeartbeat{
195196
{WorkerInstanceKey: "user-1", TaskQueue: "my-queue"},
196197
{WorkerInstanceKey: "user-2", TaskQueue: "my-queue"},
197198
{WorkerInstanceKey: "sys-1", TaskQueue: "temporal-sys-per-ns-tq"},
@@ -211,6 +212,52 @@ func TestFilterWorkersExcludesSystemWorkers(t *testing.T) {
211212
})
212213
}
213214

215+
func TestIsSystemPrincipal(t *testing.T) {
216+
t.Run("principal with type temporal marks as system worker", func(t *testing.T) {
217+
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
218+
defer m.Stop()
219+
220+
principal := &commonpb.Principal{Type: "temporal", Name: "internal"}
221+
m.upsertHeartbeats("ns", principal, []*workerpb.WorkerHeartbeat{
222+
{WorkerInstanceKey: "sys-worker", TaskQueue: "any-queue"},
223+
})
224+
225+
list := m.filterWorkers("ns", false /*includeSystemWorkers*/, alwaysTrue)
226+
require.Empty(t, list, "worker with temporal principal should be excluded")
227+
228+
list = m.filterWorkers("ns", true /*includeSystemWorkers*/, alwaysTrue)
229+
require.Len(t, list, 1)
230+
})
231+
232+
t.Run("principal with non-temporal type is not system worker", func(t *testing.T) {
233+
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
234+
defer m.Stop()
235+
236+
principal := &commonpb.Principal{Type: "user", Name: "alice"}
237+
m.upsertHeartbeats("ns", principal, []*workerpb.WorkerHeartbeat{
238+
{WorkerInstanceKey: "user-worker", TaskQueue: "temporal-sys-per-ns-tq"},
239+
})
240+
241+
// Even though task queue has system prefix, principal says it's a user
242+
list := m.filterWorkers("ns", false /*includeSystemWorkers*/, alwaysTrue)
243+
require.Len(t, list, 1, "worker with user principal should not be excluded")
244+
})
245+
246+
t.Run("nil principal falls back to task queue prefix", func(t *testing.T) {
247+
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
248+
defer m.Stop()
249+
250+
m.upsertHeartbeats("ns", nil /*principal*/, []*workerpb.WorkerHeartbeat{
251+
{WorkerInstanceKey: "sys-worker", TaskQueue: "temporal-sys-per-ns-tq"},
252+
{WorkerInstanceKey: "user-worker", TaskQueue: "my-queue"},
253+
})
254+
255+
list := m.filterWorkers("ns", false /*includeSystemWorkers*/, alwaysTrue)
256+
require.Len(t, list, 1)
257+
require.Equal(t, "user-worker", list[0].WorkerInstanceKey)
258+
})
259+
}
260+
214261
func TestEvictByTTL(t *testing.T) {
215262
// Use capture handler to verify metrics
216263
captureHandler := metricstest.NewCaptureHandler()
@@ -231,7 +278,7 @@ func TestEvictByTTL(t *testing.T) {
231278
defer m.Stop()
232279

233280
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "oldWorker"}
234-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
281+
m.upsertHeartbeats("ns", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb})
235282

236283
// Manually move beyond TTL
237284
b := m.getBucket("ns")
@@ -279,7 +326,7 @@ func TestEvictByCapacity(t *testing.T) {
279326
for i := 1; i <= 5; i++ {
280327
key := fmt.Sprintf("cap%d", i)
281328
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
282-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
329+
m.upsertHeartbeats("ns", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb})
283330
}
284331

285332
// All entries have lastSeen.Before(now) when MinEvictAge=0, so eligible
@@ -337,7 +384,7 @@ func TestEvictByCapacityWithMinAgeProtection(t *testing.T) {
337384
for i := 1; i <= 3; i++ {
338385
key := fmt.Sprintf("worker%d", i)
339386
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
340-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
387+
m.upsertHeartbeats("ns", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb})
341388
}
342389

343390
// Verify we're over capacity
@@ -389,7 +436,7 @@ func TestEvictByCapacityAfterMinAge(t *testing.T) {
389436
for i := 1; i <= 3; i++ {
390437
key := fmt.Sprintf("worker%d", i)
391438
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
392-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
439+
m.upsertHeartbeats("ns", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb})
393440
}
394441

395442
// Virtual time advance - instant with synctest!
@@ -439,14 +486,14 @@ func TestMultipleNamespaces(t *testing.T) {
439486
{WorkerInstanceKey: "ns1-worker2", TaskQueue: "queue1"},
440487
{WorkerInstanceKey: "ns1-worker3", TaskQueue: "queue2"},
441488
}
442-
m.upsertHeartbeats("namespace1", ns1Workers)
489+
m.upsertHeartbeats("namespace1", nil /*principal*/,ns1Workers)
443490

444491
// Add 2 workers to namespace2
445492
ns2Workers := []*workerpb.WorkerHeartbeat{
446493
{WorkerInstanceKey: "ns2-worker1", TaskQueue: "queue3"},
447494
{WorkerInstanceKey: "ns2-worker2", TaskQueue: "queue3"},
448495
}
449-
m.upsertHeartbeats("namespace2", ns2Workers)
496+
m.upsertHeartbeats("namespace2", nil /*principal*/,ns2Workers)
450497

451498
// Verify functional behavior first
452499
ns1List := m.filterWorkers("namespace1", true /*includeSystemWorkers*/,alwaysTrue)
@@ -504,7 +551,7 @@ func TestEvictLoopRecordsUtilizationMetric(t *testing.T) {
504551
for i := 1; i <= 3; i++ {
505552
key := fmt.Sprintf("worker%d", i)
506553
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
507-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
554+
m.upsertHeartbeats("ns", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb})
508555
}
509556

510557
// Verify initial state
@@ -548,7 +595,7 @@ func BenchmarkUpdate(b *testing.B) {
548595
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "benchWorker"}
549596
b.ResetTimer()
550597
for i := 0; i < b.N; i++ {
551-
m.upsertHeartbeats("benchNs", []*workerpb.WorkerHeartbeat{hb})
598+
m.upsertHeartbeats("benchNs", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb})
552599
}
553600
}
554601

@@ -569,7 +616,7 @@ func BenchmarkListNamespace(b *testing.B) {
569616
for i := range 1000 {
570617
key := fmt.Sprintf("worker%d", i)
571618
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
572-
m.upsertHeartbeats("benchNs", []*workerpb.WorkerHeartbeat{hb})
619+
m.upsertHeartbeats("benchNs", nil /*principal*/,[]*workerpb.WorkerHeartbeat{hb})
573620
}
574621
b.ResetTimer()
575622
for i := 0; i < b.N; i++ {
@@ -604,7 +651,7 @@ func BenchmarkRandomUpdate(b *testing.B) {
604651
for i := range totalHeartbeats {
605652
key := fmt.Sprintf("%s-worker%d", ns, i)
606653
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key, CurrentStickyCacheSize: int32(i)}
607-
m.upsertHeartbeats(ns, []*workerpb.WorkerHeartbeat{hb})
654+
m.upsertHeartbeats(ns, nil /*principal*/, []*workerpb.WorkerHeartbeat{hb})
608655
pairs = append(pairs, pair{ns: ns, hb: hb})
609656
}
610657
}
@@ -613,7 +660,7 @@ func BenchmarkRandomUpdate(b *testing.B) {
613660
b.ResetTimer()
614661
for i := 0; i < b.N; i++ {
615662
p := pairs[r.Intn(len(pairs))]
616-
m.upsertHeartbeats(p.ns, []*workerpb.WorkerHeartbeat{p.hb})
663+
m.upsertHeartbeats(p.ns, nil /*principal*/, []*workerpb.WorkerHeartbeat{p.hb})
617664
}
618665
}
619666

@@ -662,7 +709,7 @@ func TestActivitySlotsMetric(t *testing.T) {
662709

663710
testNamespace := tv.NamespaceID()
664711
testNamespaceName := namespace.Name(testNamespace + "_name")
665-
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3})
712+
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, nil /*principal*/, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3})
666713

667714
// Verify activity slots metrics
668715
snapshot := capture.Snapshot()
@@ -739,7 +786,7 @@ func TestPluginMetricsExported(t *testing.T) {
739786

740787
testNamespace := tv.NamespaceID()
741788
testNamespaceName := namespace.Name(testNamespace + "_name")
742-
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3})
789+
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, nil /*principal*/, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3})
743790

744791
// Verify plugin metrics - should have exactly 3 recordings despite plugin-a being in both workers
745792
snapshot := capture.Snapshot()
@@ -801,7 +848,7 @@ func TestPluginMetricsDisabled(t *testing.T) {
801848
// Upsert heartbeats for test namespace
802849
testNamespace := tv.NamespaceID()
803850
testNamespaceName := namespace.Name(testNamespace + "_name")
804-
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, []*workerpb.WorkerHeartbeat{worker1})
851+
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, nil /*principal*/, []*workerpb.WorkerHeartbeat{worker1})
805852

806853
// Verify no plugin metrics are recorded when disabled
807854
snapshot := capture.Snapshot()

0 commit comments

Comments
 (0)