99
1010 "github.com/stretchr/testify/assert"
1111 "github.com/stretchr/testify/require"
12+ commonpb "go.temporal.io/api/common/v1"
1213 enumspb "go.temporal.io/api/enums/v1"
1314 workerpb "go.temporal.io/api/worker/v1"
1415 "go.temporal.io/server/common/dynamicconfig"
@@ -47,7 +48,7 @@ func TestUpdateAndListNamespace(t *testing.T) {
4748 // Add some heartbeats
4849 hb1 := & workerpb.WorkerHeartbeat {WorkerInstanceKey : "workerA" , Status : enumspb .WORKER_STATUS_RUNNING }
4950 hb2 := & workerpb.WorkerHeartbeat {WorkerInstanceKey : "workerB" , Status : enumspb .WORKER_STATUS_RUNNING }
50- m .upsertHeartbeats ("ns1" , []* workerpb.WorkerHeartbeat {hb1 , hb2 })
51+ m .upsertHeartbeats ("ns1" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb1 , hb2 })
5152
5253 list = m .filterWorkers ("ns1" , true /*includeSystemWorkers*/ ,alwaysTrue )
5354 // Order is not guaranteed; check contents by keys
@@ -92,7 +93,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
9293 // Add two running workers
9394 hb1 := & workerpb.WorkerHeartbeat {WorkerInstanceKey : "worker1" , Status : enumspb .WORKER_STATUS_RUNNING }
9495 hb2 := & workerpb.WorkerHeartbeat {WorkerInstanceKey : "worker2" , Status : enumspb .WORKER_STATUS_RUNNING }
95- m .upsertHeartbeats ("ns1" , []* workerpb.WorkerHeartbeat {hb1 , hb2 })
96+ m .upsertHeartbeats ("ns1" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb1 , hb2 })
9697
9798 // Verify both workers are registered
9899 list := m .filterWorkers ("ns1" , true /*includeSystemWorkers*/ ,alwaysTrue )
@@ -101,7 +102,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
101102
102103 // Worker1 sends shutdown status
103104 hbShutdown := & workerpb.WorkerHeartbeat {WorkerInstanceKey : "worker1" , Status : enumspb .WORKER_STATUS_SHUTDOWN }
104- m .upsertHeartbeats ("ns1" , []* workerpb.WorkerHeartbeat {hbShutdown })
105+ m .upsertHeartbeats ("ns1" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hbShutdown })
105106
106107 // Verify only worker1 is removed, worker2 remains
107108 list = m .filterWorkers ("ns1" , true /*includeSystemWorkers*/ ,alwaysTrue )
@@ -139,7 +140,7 @@ func TestShutdownStatusForNonExistentWorker(t *testing.T) {
139140
140141 // Send shutdown for non-existent worker - should be a no-op
141142 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : "unknown" , Status : enumspb .WORKER_STATUS_SHUTDOWN }
142- m .upsertHeartbeats ("ns1" , []* workerpb.WorkerHeartbeat {hb })
143+ m .upsertHeartbeats ("ns1" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
143144
144145 // Verify nothing happened
145146 list := m .filterWorkers ("ns1" , true /*includeSystemWorkers*/ ,alwaysTrue )
@@ -165,7 +166,7 @@ func TestListNamespacePredicate(t *testing.T) {
165166 for i := 1 ; i <= 5 ; i ++ {
166167 key := fmt .Sprintf ("key%d" , i )
167168 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : key , CurrentStickyCacheSize : int32 (i )}
168- m .upsertHeartbeats ("ns" , []* workerpb.WorkerHeartbeat {hb })
169+ m .upsertHeartbeats ("ns" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
169170 }
170171
171172 // Table-driven tests for predicates
@@ -191,7 +192,7 @@ func TestFilterWorkersExcludesSystemWorkers(t *testing.T) {
191192 m := newRegistryImpl (testDefaultRegistryParams (metrics .NoopMetricsHandler ))
192193 defer m .Stop ()
193194
194- m .upsertHeartbeats ("ns" , []* workerpb.WorkerHeartbeat {
195+ m .upsertHeartbeats ("ns" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {
195196 {WorkerInstanceKey : "user-1" , TaskQueue : "my-queue" },
196197 {WorkerInstanceKey : "user-2" , TaskQueue : "my-queue" },
197198 {WorkerInstanceKey : "sys-1" , TaskQueue : "temporal-sys-per-ns-tq" },
@@ -211,6 +212,52 @@ func TestFilterWorkersExcludesSystemWorkers(t *testing.T) {
211212 })
212213}
213214
215+ func TestIsSystemPrincipal (t * testing.T ) {
216+ t .Run ("principal with type temporal marks as system worker" , func (t * testing.T ) {
217+ m := newRegistryImpl (testDefaultRegistryParams (metrics .NoopMetricsHandler ))
218+ defer m .Stop ()
219+
220+ principal := & commonpb.Principal {Type : "temporal" , Name : "internal" }
221+ m .upsertHeartbeats ("ns" , principal , []* workerpb.WorkerHeartbeat {
222+ {WorkerInstanceKey : "sys-worker" , TaskQueue : "any-queue" },
223+ })
224+
225+ list := m .filterWorkers ("ns" , false /*includeSystemWorkers*/ , alwaysTrue )
226+ require .Empty (t , list , "worker with temporal principal should be excluded" )
227+
228+ list = m .filterWorkers ("ns" , true /*includeSystemWorkers*/ , alwaysTrue )
229+ require .Len (t , list , 1 )
230+ })
231+
232+ t .Run ("principal with non-temporal type is not system worker" , func (t * testing.T ) {
233+ m := newRegistryImpl (testDefaultRegistryParams (metrics .NoopMetricsHandler ))
234+ defer m .Stop ()
235+
236+ principal := & commonpb.Principal {Type : "user" , Name : "alice" }
237+ m .upsertHeartbeats ("ns" , principal , []* workerpb.WorkerHeartbeat {
238+ {WorkerInstanceKey : "user-worker" , TaskQueue : "temporal-sys-per-ns-tq" },
239+ })
240+
241+ // Even though task queue has system prefix, principal says it's a user
242+ list := m .filterWorkers ("ns" , false /*includeSystemWorkers*/ , alwaysTrue )
243+ require .Len (t , list , 1 , "worker with user principal should not be excluded" )
244+ })
245+
246+ t .Run ("nil principal falls back to task queue prefix" , func (t * testing.T ) {
247+ m := newRegistryImpl (testDefaultRegistryParams (metrics .NoopMetricsHandler ))
248+ defer m .Stop ()
249+
250+ m .upsertHeartbeats ("ns" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {
251+ {WorkerInstanceKey : "sys-worker" , TaskQueue : "temporal-sys-per-ns-tq" },
252+ {WorkerInstanceKey : "user-worker" , TaskQueue : "my-queue" },
253+ })
254+
255+ list := m .filterWorkers ("ns" , false /*includeSystemWorkers*/ , alwaysTrue )
256+ require .Len (t , list , 1 )
257+ require .Equal (t , "user-worker" , list [0 ].WorkerInstanceKey )
258+ })
259+ }
260+
214261func TestEvictByTTL (t * testing.T ) {
215262 // Use capture handler to verify metrics
216263 captureHandler := metricstest .NewCaptureHandler ()
@@ -231,7 +278,7 @@ func TestEvictByTTL(t *testing.T) {
231278 defer m .Stop ()
232279
233280 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : "oldWorker" }
234- m .upsertHeartbeats ("ns" , []* workerpb.WorkerHeartbeat {hb })
281+ m .upsertHeartbeats ("ns" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
235282
236283 // Manually move beyond TTL
237284 b := m .getBucket ("ns" )
@@ -279,7 +326,7 @@ func TestEvictByCapacity(t *testing.T) {
279326 for i := 1 ; i <= 5 ; i ++ {
280327 key := fmt .Sprintf ("cap%d" , i )
281328 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : key }
282- m .upsertHeartbeats ("ns" , []* workerpb.WorkerHeartbeat {hb })
329+ m .upsertHeartbeats ("ns" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
283330 }
284331
285332 // All entries have lastSeen.Before(now) when MinEvictAge=0, so eligible
@@ -337,7 +384,7 @@ func TestEvictByCapacityWithMinAgeProtection(t *testing.T) {
337384 for i := 1 ; i <= 3 ; i ++ {
338385 key := fmt .Sprintf ("worker%d" , i )
339386 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : key }
340- m .upsertHeartbeats ("ns" , []* workerpb.WorkerHeartbeat {hb })
387+ m .upsertHeartbeats ("ns" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
341388 }
342389
343390 // Verify we're over capacity
@@ -389,7 +436,7 @@ func TestEvictByCapacityAfterMinAge(t *testing.T) {
389436 for i := 1 ; i <= 3 ; i ++ {
390437 key := fmt .Sprintf ("worker%d" , i )
391438 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : key }
392- m .upsertHeartbeats ("ns" , []* workerpb.WorkerHeartbeat {hb })
439+ m .upsertHeartbeats ("ns" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
393440 }
394441
395442 // Virtual time advance - instant with synctest!
@@ -439,14 +486,14 @@ func TestMultipleNamespaces(t *testing.T) {
439486 {WorkerInstanceKey : "ns1-worker2" , TaskQueue : "queue1" },
440487 {WorkerInstanceKey : "ns1-worker3" , TaskQueue : "queue2" },
441488 }
442- m .upsertHeartbeats ("namespace1" , ns1Workers )
489+ m .upsertHeartbeats ("namespace1" , nil /*principal*/ , ns1Workers )
443490
444491 // Add 2 workers to namespace2
445492 ns2Workers := []* workerpb.WorkerHeartbeat {
446493 {WorkerInstanceKey : "ns2-worker1" , TaskQueue : "queue3" },
447494 {WorkerInstanceKey : "ns2-worker2" , TaskQueue : "queue3" },
448495 }
449- m .upsertHeartbeats ("namespace2" , ns2Workers )
496+ m .upsertHeartbeats ("namespace2" , nil /*principal*/ , ns2Workers )
450497
451498 // Verify functional behavior first
452499 ns1List := m .filterWorkers ("namespace1" , true /*includeSystemWorkers*/ ,alwaysTrue )
@@ -504,7 +551,7 @@ func TestEvictLoopRecordsUtilizationMetric(t *testing.T) {
504551 for i := 1 ; i <= 3 ; i ++ {
505552 key := fmt .Sprintf ("worker%d" , i )
506553 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : key }
507- m .upsertHeartbeats ("ns" , []* workerpb.WorkerHeartbeat {hb })
554+ m .upsertHeartbeats ("ns" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
508555 }
509556
510557 // Verify initial state
@@ -548,7 +595,7 @@ func BenchmarkUpdate(b *testing.B) {
548595 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : "benchWorker" }
549596 b .ResetTimer ()
550597 for i := 0 ; i < b .N ; i ++ {
551- m .upsertHeartbeats ("benchNs" , []* workerpb.WorkerHeartbeat {hb })
598+ m .upsertHeartbeats ("benchNs" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
552599 }
553600}
554601
@@ -569,7 +616,7 @@ func BenchmarkListNamespace(b *testing.B) {
569616 for i := range 1000 {
570617 key := fmt .Sprintf ("worker%d" , i )
571618 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : key }
572- m .upsertHeartbeats ("benchNs" , []* workerpb.WorkerHeartbeat {hb })
619+ m .upsertHeartbeats ("benchNs" , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
573620 }
574621 b .ResetTimer ()
575622 for i := 0 ; i < b .N ; i ++ {
@@ -604,7 +651,7 @@ func BenchmarkRandomUpdate(b *testing.B) {
604651 for i := range totalHeartbeats {
605652 key := fmt .Sprintf ("%s-worker%d" , ns , i )
606653 hb := & workerpb.WorkerHeartbeat {WorkerInstanceKey : key , CurrentStickyCacheSize : int32 (i )}
607- m .upsertHeartbeats (ns , []* workerpb.WorkerHeartbeat {hb })
654+ m .upsertHeartbeats (ns , nil /*principal*/ , []* workerpb.WorkerHeartbeat {hb })
608655 pairs = append (pairs , pair {ns : ns , hb : hb })
609656 }
610657 }
@@ -613,7 +660,7 @@ func BenchmarkRandomUpdate(b *testing.B) {
613660 b .ResetTimer ()
614661 for i := 0 ; i < b .N ; i ++ {
615662 p := pairs [r .Intn (len (pairs ))]
616- m .upsertHeartbeats (p .ns , []* workerpb.WorkerHeartbeat {p .hb })
663+ m .upsertHeartbeats (p .ns , nil /*principal*/ , []* workerpb.WorkerHeartbeat {p .hb })
617664 }
618665}
619666
@@ -662,7 +709,7 @@ func TestActivitySlotsMetric(t *testing.T) {
662709
663710 testNamespace := tv .NamespaceID ()
664711 testNamespaceName := namespace .Name (testNamespace + "_name" )
665- m .RecordWorkerHeartbeats (testNamespace , testNamespaceName , []* workerpb.WorkerHeartbeat {worker1 , worker2 , worker3 })
712+ m .RecordWorkerHeartbeats (testNamespace , testNamespaceName , nil /*principal*/ , []* workerpb.WorkerHeartbeat {worker1 , worker2 , worker3 })
666713
667714 // Verify activity slots metrics
668715 snapshot := capture .Snapshot ()
@@ -739,7 +786,7 @@ func TestPluginMetricsExported(t *testing.T) {
739786
740787 testNamespace := tv .NamespaceID ()
741788 testNamespaceName := namespace .Name (testNamespace + "_name" )
742- m .RecordWorkerHeartbeats (testNamespace , testNamespaceName , []* workerpb.WorkerHeartbeat {worker1 , worker2 , worker3 })
789+ m .RecordWorkerHeartbeats (testNamespace , testNamespaceName , nil /*principal*/ , []* workerpb.WorkerHeartbeat {worker1 , worker2 , worker3 })
743790
744791 // Verify plugin metrics - should have exactly 3 recordings despite plugin-a being in both workers
745792 snapshot := capture .Snapshot ()
@@ -801,7 +848,7 @@ func TestPluginMetricsDisabled(t *testing.T) {
801848 // Upsert heartbeats for test namespace
802849 testNamespace := tv .NamespaceID ()
803850 testNamespaceName := namespace .Name (testNamespace + "_name" )
804- m .RecordWorkerHeartbeats (testNamespace , testNamespaceName , []* workerpb.WorkerHeartbeat {worker1 })
851+ m .RecordWorkerHeartbeats (testNamespace , testNamespaceName , nil /*principal*/ , []* workerpb.WorkerHeartbeat {worker1 })
805852
806853 // Verify no plugin metrics are recorded when disabled
807854 snapshot := capture .Snapshot ()
0 commit comments