Skip to content

Commit 18d5ae9

Browse files
rkannan82claude
andauthored
Use principal to classify system workers (#10143)
## What Use the gRPC principal to classify workers as system workers at upsert time, falling back to task queue prefix matching when principal is not available. ## Why Principal-based classification is more robust than string matching on task queue names and works correctly when principal propagation is enabled. ## How did you test it? Unit tests covering three scenarios: - temporal principal marks worker as system - non-temporal principal overrides task queue prefix - nil principal falls back to prefix matching 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 285606a commit 18d5ae9

5 files changed

Lines changed: 129 additions & 53 deletions

File tree

service/matching/handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -590,12 +590,13 @@ func (h *Handler) ListNexusEndpoints(ctx context.Context, request *matchingservi
590590

591591
// RecordWorkerHeartbeat receive heartbeat request from the worker.
592592
func (h *Handler) RecordWorkerHeartbeat(
593-
_ context.Context, request *matchingservice.RecordWorkerHeartbeatRequest,
593+
ctx context.Context, request *matchingservice.RecordWorkerHeartbeatRequest,
594594
) (*matchingservice.RecordWorkerHeartbeatResponse, error) {
595595
nsID := namespace.ID(request.GetNamespaceId())
596596
nsName := h.namespaceName(nsID)
597+
principal := headers.GetPrincipal(ctx)
597598

598-
h.workersRegistry.RecordWorkerHeartbeats(nsID, nsName, request.GetHeartbeartRequest().GetWorkerHeartbeat())
599+
h.workersRegistry.RecordWorkerHeartbeats(nsID, nsName, principal, request.GetHeartbeartRequest().GetWorkerHeartbeat())
599600
return &matchingservice.RecordWorkerHeartbeatResponse{}, nil
600601
}
601602

service/matching/workers/registry.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package workers
22

33
import (
4+
commonpb "go.temporal.io/api/common/v1"
45
workerpb "go.temporal.io/api/worker/v1"
56
"go.temporal.io/server/common/namespace"
67
)
@@ -21,7 +22,7 @@ type (
2122
}
2223

2324
Registry interface {
24-
RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, workerHeartbeat []*workerpb.WorkerHeartbeat)
25+
RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, principal *commonpb.Principal, workerHeartbeat []*workerpb.WorkerHeartbeat)
2526
ListWorkers(nsID namespace.ID, params ListWorkersParams) (ListWorkersResponse, error)
2627
DescribeWorker(nsID namespace.ID, workerInstanceKey string) (*workerpb.WorkerHeartbeat, error)
2728
}

service/matching/workers/registry_impl.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010
"sync/atomic"
1111
"time"
1212

13+
commonpb "go.temporal.io/api/common/v1"
1314
enumspb "go.temporal.io/api/enums/v1"
1415
"go.temporal.io/api/serviceerror"
1516
workerpb "go.temporal.io/api/worker/v1"
17+
"go.temporal.io/server/common/authorization"
1618
"go.temporal.io/server/common/dynamicconfig"
1719
"go.temporal.io/server/common/metrics"
1820
"go.temporal.io/server/common/namespace"
@@ -82,7 +84,7 @@ func newBucket() *bucket {
8284
// upsertHeartbeats inserts or refreshes a WorkerHeartbeat under the given namespace.
8385
// Returns the count of added and removed entries separately.
8486
// Workers with WORKER_STATUS_SHUTDOWN are immediately removed from the registry.
85-
func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.WorkerHeartbeat) (added int64, removed int64) {
87+
func (b *bucket) upsertHeartbeats(nsID namespace.ID, principal *commonpb.Principal, heartbeats []*workerpb.WorkerHeartbeat) (added int64, removed int64) {
8688
now := time.Now()
8789

8890
b.mu.Lock()
@@ -107,7 +109,7 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
107109
continue
108110
}
109111

110-
isSystemWorker := primitives.IsInternalTaskQueue(hb.GetTaskQueue())
112+
isSystemWorker := isSystemWorker(principal, hb.GetTaskQueue())
111113

112114
// Normal upsert
113115
if e, exists := mp[key]; exists {
@@ -257,9 +259,9 @@ func (m *registryImpl) getBucket(nsID namespace.ID) *bucket {
257259

258260
// upsertHeartbeat records or refreshes a WorkerHeartbeat under the given namespace.
259261
// New entries increment the global counter.
260-
func (m *registryImpl) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.WorkerHeartbeat) {
262+
func (m *registryImpl) upsertHeartbeats(nsID namespace.ID, principal *commonpb.Principal, heartbeats []*workerpb.WorkerHeartbeat) {
261263
b := m.getBucket(nsID)
262-
added, removed := b.upsertHeartbeats(nsID, heartbeats)
264+
added, removed := b.upsertHeartbeats(nsID, principal, heartbeats)
263265
m.total.Add(added - removed)
264266
if added > 0 {
265267
metrics.WorkerRegistryWorkersAdded.With(m.metricsHandler).Record(added)
@@ -375,8 +377,8 @@ func (m *registryImpl) Stop() {
375377
close(m.quit)
376378
}
377379

378-
func (m *registryImpl) RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, workerHeartbeat []*workerpb.WorkerHeartbeat) {
379-
m.upsertHeartbeats(nsID, workerHeartbeat)
380+
func (m *registryImpl) RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, principal *commonpb.Principal, workerHeartbeat []*workerpb.WorkerHeartbeat) {
381+
m.upsertHeartbeats(nsID, principal, workerHeartbeat)
380382
m.metricsEmitter.emit(nsID, nsName, workerHeartbeat)
381383
}
382384

@@ -477,3 +479,14 @@ func (m *registryImpl) DescribeWorker(nsID namespace.ID, workerInstanceKey strin
477479
}
478480
return b.getWorkerHeartbeat(nsID, workerInstanceKey)
479481
}
482+
483+
// isSystemWorker determines if a worker is a system worker.
484+
// If a principal is available, it checks whether the principal identifies
485+
// the Temporal server itself (type="temporal", name="internal"). Otherwise,
486+
// it falls back to checking the task queue name prefix.
487+
func isSystemWorker(principal *commonpb.Principal, taskQueue string) bool {
488+
if principal != nil {
489+
return principal.GetType() == authorization.InternalPrincipalType && principal.GetName() == authorization.InternalPrincipalName
490+
}
491+
return primitives.IsInternalTaskQueue(taskQueue)
492+
}

service/matching/workers/registry_impl_test.go

Lines changed: 81 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99

1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
12+
commonpb "go.temporal.io/api/common/v1"
1213
enumspb "go.temporal.io/api/enums/v1"
1314
workerpb "go.temporal.io/api/worker/v1"
15+
"go.temporal.io/server/common/authorization"
1416
"go.temporal.io/server/common/dynamicconfig"
1517
"go.temporal.io/server/common/metrics"
1618
"go.temporal.io/server/common/metrics/metricstest"
@@ -47,7 +49,7 @@ func TestUpdateAndListNamespace(t *testing.T) {
4749
// Add some heartbeats
4850
hb1 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerA", Status: enumspb.WORKER_STATUS_RUNNING}
4951
hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerB", Status: enumspb.WORKER_STATUS_RUNNING}
50-
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})
52+
m.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{hb1, hb2})
5153

5254
list = m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue)
5355
// Order is not guaranteed; check contents by keys
@@ -92,7 +94,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
9294
// Add two running workers
9395
hb1 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "worker1", Status: enumspb.WORKER_STATUS_RUNNING}
9496
hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "worker2", Status: enumspb.WORKER_STATUS_RUNNING}
95-
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})
97+
m.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{hb1, hb2})
9698

9799
// Verify both workers are registered
98100
list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue)
@@ -101,7 +103,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
101103

102104
// Worker1 sends shutdown status
103105
hbShutdown := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "worker1", Status: enumspb.WORKER_STATUS_SHUTDOWN}
104-
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hbShutdown})
106+
m.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{hbShutdown})
105107

106108
// Verify only worker1 is removed, worker2 remains
107109
list = m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue)
@@ -139,7 +141,7 @@ func TestShutdownStatusForNonExistentWorker(t *testing.T) {
139141

140142
// Send shutdown for non-existent worker - should be a no-op
141143
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "unknown", Status: enumspb.WORKER_STATUS_SHUTDOWN}
142-
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb})
144+
m.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
143145

144146
// Verify nothing happened
145147
list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue)
@@ -165,7 +167,7 @@ func TestListNamespacePredicate(t *testing.T) {
165167
for i := 1; i <= 5; i++ {
166168
key := fmt.Sprintf("key%d", i)
167169
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key, CurrentStickyCacheSize: int32(i)}
168-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
170+
m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
169171
}
170172

171173
// Table-driven tests for predicates
@@ -191,7 +193,7 @@ func TestFilterWorkersExcludesSystemWorkers(t *testing.T) {
191193
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
192194
defer m.Stop()
193195

194-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{
196+
m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{
195197
{WorkerInstanceKey: "user-1", TaskQueue: "my-queue"},
196198
{WorkerInstanceKey: "user-2", TaskQueue: "my-queue"},
197199
{WorkerInstanceKey: "sys-1", TaskQueue: "temporal-sys-per-ns-tq"},
@@ -211,6 +213,65 @@ func TestFilterWorkersExcludesSystemWorkers(t *testing.T) {
211213
})
212214
}
213215

216+
func TestIsSystemWorker(t *testing.T) {
217+
t.Run("principal with type temporal marks as system worker", func(t *testing.T) {
218+
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
219+
defer m.Stop()
220+
221+
principal := &commonpb.Principal{Type: authorization.InternalPrincipalType, Name: authorization.InternalPrincipalName}
222+
m.upsertHeartbeats("ns", principal, []*workerpb.WorkerHeartbeat{
223+
{WorkerInstanceKey: "sys-worker", TaskQueue: "any-queue"},
224+
})
225+
226+
list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue)
227+
require.Empty(t, list, "worker with temporal principal should be excluded")
228+
229+
list = m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue)
230+
require.Len(t, list, 1)
231+
})
232+
233+
t.Run("principal with type temporal but non-internal name is not system worker", func(t *testing.T) {
234+
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
235+
defer m.Stop()
236+
237+
principal := &commonpb.Principal{Type: authorization.InternalPrincipalType, Name: "other"}
238+
m.upsertHeartbeats("ns", principal, []*workerpb.WorkerHeartbeat{
239+
{WorkerInstanceKey: "worker", TaskQueue: "any-queue"},
240+
})
241+
242+
list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue)
243+
require.Len(t, list, 1, "worker with non-internal name should not be excluded")
244+
})
245+
246+
t.Run("principal with non-temporal type is not system worker", func(t *testing.T) {
247+
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
248+
defer m.Stop()
249+
250+
principal := &commonpb.Principal{Type: "user", Name: "alice"}
251+
m.upsertHeartbeats("ns", principal, []*workerpb.WorkerHeartbeat{
252+
{WorkerInstanceKey: "user-worker", TaskQueue: "temporal-sys-per-ns-tq"},
253+
})
254+
255+
// Even though task queue has system prefix, principal says it's a user
256+
list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue)
257+
require.Len(t, list, 1, "worker with user principal should not be excluded")
258+
})
259+
260+
t.Run("nil principal falls back to task queue prefix", func(t *testing.T) {
261+
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
262+
defer m.Stop()
263+
264+
m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{
265+
{WorkerInstanceKey: "sys-worker", TaskQueue: "temporal-sys-per-ns-tq"},
266+
{WorkerInstanceKey: "user-worker", TaskQueue: "my-queue"},
267+
})
268+
269+
list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue)
270+
require.Len(t, list, 1)
271+
require.Equal(t, "user-worker", list[0].WorkerInstanceKey)
272+
})
273+
}
274+
214275
func TestEvictByTTL(t *testing.T) {
215276
// Use capture handler to verify metrics
216277
captureHandler := metricstest.NewCaptureHandler()
@@ -231,7 +292,7 @@ func TestEvictByTTL(t *testing.T) {
231292
defer m.Stop()
232293

233294
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "oldWorker"}
234-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
295+
m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
235296

236297
// Manually move beyond TTL
237298
b := m.getBucket("ns")
@@ -279,7 +340,7 @@ func TestEvictByCapacity(t *testing.T) {
279340
for i := 1; i <= 5; i++ {
280341
key := fmt.Sprintf("cap%d", i)
281342
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
282-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
343+
m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
283344
}
284345

285346
// All entries have lastSeen.Before(now) when MinEvictAge=0, so eligible
@@ -337,7 +398,7 @@ func TestEvictByCapacityWithMinAgeProtection(t *testing.T) {
337398
for i := 1; i <= 3; i++ {
338399
key := fmt.Sprintf("worker%d", i)
339400
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
340-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
401+
m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
341402
}
342403

343404
// Verify we're over capacity
@@ -389,7 +450,7 @@ func TestEvictByCapacityAfterMinAge(t *testing.T) {
389450
for i := 1; i <= 3; i++ {
390451
key := fmt.Sprintf("worker%d", i)
391452
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
392-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
453+
m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
393454
}
394455

395456
// Virtual time advance - instant with synctest!
@@ -439,14 +500,14 @@ func TestMultipleNamespaces(t *testing.T) {
439500
{WorkerInstanceKey: "ns1-worker2", TaskQueue: "queue1"},
440501
{WorkerInstanceKey: "ns1-worker3", TaskQueue: "queue2"},
441502
}
442-
m.upsertHeartbeats("namespace1", ns1Workers)
503+
m.upsertHeartbeats("namespace1", nil /* principal */, ns1Workers)
443504

444505
// Add 2 workers to namespace2
445506
ns2Workers := []*workerpb.WorkerHeartbeat{
446507
{WorkerInstanceKey: "ns2-worker1", TaskQueue: "queue3"},
447508
{WorkerInstanceKey: "ns2-worker2", TaskQueue: "queue3"},
448509
}
449-
m.upsertHeartbeats("namespace2", ns2Workers)
510+
m.upsertHeartbeats("namespace2", nil /* principal */, ns2Workers)
450511

451512
// Verify functional behavior first
452513
ns1List := m.filterWorkers("namespace1", true /* includeSystemWorkers */, alwaysTrue)
@@ -504,7 +565,7 @@ func TestEvictLoopRecordsUtilizationMetric(t *testing.T) {
504565
for i := 1; i <= 3; i++ {
505566
key := fmt.Sprintf("worker%d", i)
506567
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
507-
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
568+
m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
508569
}
509570

510571
// Verify initial state
@@ -548,7 +609,7 @@ func BenchmarkUpdate(b *testing.B) {
548609
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "benchWorker"}
549610
b.ResetTimer()
550611
for i := 0; i < b.N; i++ {
551-
m.upsertHeartbeats("benchNs", []*workerpb.WorkerHeartbeat{hb})
612+
m.upsertHeartbeats("benchNs", nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
552613
}
553614
}
554615

@@ -569,7 +630,7 @@ func BenchmarkListNamespace(b *testing.B) {
569630
for i := range 1000 {
570631
key := fmt.Sprintf("worker%d", i)
571632
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
572-
m.upsertHeartbeats("benchNs", []*workerpb.WorkerHeartbeat{hb})
633+
m.upsertHeartbeats("benchNs", nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
573634
}
574635
b.ResetTimer()
575636
for i := 0; i < b.N; i++ {
@@ -604,7 +665,7 @@ func BenchmarkRandomUpdate(b *testing.B) {
604665
for i := range totalHeartbeats {
605666
key := fmt.Sprintf("%s-worker%d", ns, i)
606667
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key, CurrentStickyCacheSize: int32(i)}
607-
m.upsertHeartbeats(ns, []*workerpb.WorkerHeartbeat{hb})
668+
m.upsertHeartbeats(ns, nil /* principal */, []*workerpb.WorkerHeartbeat{hb})
608669
pairs = append(pairs, pair{ns: ns, hb: hb})
609670
}
610671
}
@@ -613,7 +674,7 @@ func BenchmarkRandomUpdate(b *testing.B) {
613674
b.ResetTimer()
614675
for i := 0; i < b.N; i++ {
615676
p := pairs[r.Intn(len(pairs))]
616-
m.upsertHeartbeats(p.ns, []*workerpb.WorkerHeartbeat{p.hb})
677+
m.upsertHeartbeats(p.ns, nil /* principal */, []*workerpb.WorkerHeartbeat{p.hb})
617678
}
618679
}
619680

@@ -662,7 +723,7 @@ func TestActivitySlotsMetric(t *testing.T) {
662723

663724
testNamespace := tv.NamespaceID()
664725
testNamespaceName := namespace.Name(testNamespace + "_name")
665-
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3})
726+
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, nil /* principal */, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3})
666727

667728
// Verify activity slots metrics
668729
snapshot := capture.Snapshot()
@@ -739,7 +800,7 @@ func TestPluginMetricsExported(t *testing.T) {
739800

740801
testNamespace := tv.NamespaceID()
741802
testNamespaceName := namespace.Name(testNamespace + "_name")
742-
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3})
803+
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, nil /* principal */, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3})
743804

744805
// Verify plugin metrics - should have exactly 3 recordings despite plugin-a being in both workers
745806
snapshot := capture.Snapshot()
@@ -801,7 +862,7 @@ func TestPluginMetricsDisabled(t *testing.T) {
801862
// Upsert heartbeats for test namespace
802863
testNamespace := tv.NamespaceID()
803864
testNamespaceName := namespace.Name(testNamespace + "_name")
804-
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, []*workerpb.WorkerHeartbeat{worker1})
865+
m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, nil /* principal */, []*workerpb.WorkerHeartbeat{worker1})
805866

806867
// Verify no plugin metrics are recorded when disabled
807868
snapshot := capture.Snapshot()

0 commit comments

Comments
 (0)