Skip to content

Commit 3a92940

Browse files
rkannan82claude
andauthored
Exclude system workers from ListWorkers by default (#10132)
## What Exclude internal system workers from ListWorkers API results by default. Users can opt in to seeing them via `include_system_workers` on the request. Depends on API change: temporalio/api#777 ## Why System workers (per-namespace workers for scheduler, batcher, worker deployment, etc.) appear in ListWorkers results, which is confusing for users who only care about their own workers. ## How did you test it? Unit tests. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ac7023a commit 3a92940

6 files changed

Lines changed: 129 additions & 30 deletions

File tree

common/primitives/task_queues.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package primitives
22

33
import (
44
"fmt"
5+
"strings"
56

67
enumspb "go.temporal.io/api/enums/v1"
78
"go.temporal.io/api/serviceerror"
@@ -33,6 +34,13 @@ func IsInternalTaskQueueKind(kind enumspb.TaskQueueKind) bool {
3334
return false
3435
}
3536

37+
const internalTaskQueuePrefix = "temporal-sys-"
38+
39+
// IsInternalTaskQueue returns true if the task queue name belongs to an internal system task queue.
40+
func IsInternalTaskQueue(taskQueue string) bool {
41+
return strings.HasPrefix(taskQueue, internalTaskQueuePrefix)
42+
}
43+
3644
func IsInternalPerNsTaskQueue(taskQueue string) bool {
3745
return taskQueue == PerNSWorkerTaskQueue
3846
}

service/matching/handler.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -606,9 +606,10 @@ func (h *Handler) ListWorkers(
606606
nsID := namespace.ID(request.GetNamespaceId())
607607
listRequest := request.GetListRequest()
608608
resp, err := h.workersRegistry.ListWorkers(nsID, workers.ListWorkersParams{
609-
Query: listRequest.GetQuery(),
610-
PageSize: int(listRequest.GetPageSize()),
611-
NextPageToken: listRequest.GetNextPageToken(),
609+
Query: listRequest.GetQuery(),
610+
PageSize: int(listRequest.GetPageSize()),
611+
NextPageToken: listRequest.GetNextPageToken(),
612+
IncludeSystemWorkers: listRequest.GetIncludeSystemWorkers(),
612613
})
613614
if err != nil {
614615
return nil, err

service/matching/workers/registry.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import (
88
type (
99
// ListWorkersParams contains parameters for listing workers.
1010
ListWorkersParams struct {
11-
Query string
12-
PageSize int
13-
NextPageToken []byte // Opaque token from a previous response to resume pagination.
11+
Query string
12+
PageSize int
13+
NextPageToken []byte // Opaque token from a previous response to resume pagination.
14+
IncludeSystemWorkers bool
1415
}
1516

1617
// ListWorkersResponse contains the result of listing workers.

service/matching/workers/registry_impl.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"go.temporal.io/server/common/dynamicconfig"
1717
"go.temporal.io/server/common/metrics"
1818
"go.temporal.io/server/common/namespace"
19+
"go.temporal.io/server/common/primitives"
1920
"go.uber.org/fx"
2021
)
2122

@@ -29,10 +30,11 @@ type listWorkersPageToken struct {
2930
type (
3031
// entry wraps a WorkerHeartbeat along with its namespace and eviction metadata.
3132
entry struct {
32-
nsID namespace.ID
33-
hb *workerpb.WorkerHeartbeat
34-
lastSeen time.Time
35-
elem *list.Element
33+
nsID namespace.ID
34+
hb *workerpb.WorkerHeartbeat
35+
lastSeen time.Time
36+
elem *list.Element
37+
isSystemWorker bool
3638
}
3739
// bucket holds part of the keyspace: a map from namespace → (map of instanceKey → entry),
3840
// plus a recency list for eviction.
@@ -105,16 +107,20 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
105107
continue
106108
}
107109

110+
isSystemWorker := primitives.IsInternalTaskQueue(hb.GetTaskQueue())
111+
108112
// Normal upsert
109113
if e, exists := mp[key]; exists {
110114
e.hb = hb
111115
e.lastSeen = now
116+
e.isSystemWorker = isSystemWorker
112117
b.order.MoveToBack(e.elem)
113118
} else {
114119
e = &entry{
115-
nsID: nsID,
116-
hb: hb,
117-
lastSeen: now,
120+
nsID: nsID,
121+
hb: hb,
122+
lastSeen: now,
123+
isSystemWorker: isSystemWorker,
118124
}
119125
e.elem = b.order.PushBack(e)
120126
mp[key] = e
@@ -126,9 +132,11 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
126132
}
127133

128134
// filterWorkers returns all WorkerHeartbeats in a namespace
129-
// for which predicate(hb) returns true.
135+
// for which predicate(hb) returns true. System workers are excluded
136+
// unless includeSystemWorkers is true.
130137
func (b *bucket) filterWorkers(
131138
nsID namespace.ID,
139+
includeSystemWorkers bool,
132140
predicate func(*workerpb.WorkerHeartbeat) bool,
133141
) []*workerpb.WorkerHeartbeat {
134142
b.mu.Lock()
@@ -140,6 +148,9 @@ func (b *bucket) filterWorkers(
140148
}
141149
out := make([]*workerpb.WorkerHeartbeat, 0, len(mp))
142150
for _, e := range mp {
151+
if !includeSystemWorkers && e.isSystemWorker {
152+
continue
153+
}
143154
if predicate(e.hb) {
144155
out = append(out, e.hb)
145156
}
@@ -280,17 +291,19 @@ func (m *registryImpl) recordEvictionMetric() {
280291
}
281292

282293
// filterWorkers returns all WorkerHeartbeats in a namespace
283-
// for which predicate(hb) returns true.
294+
// for which predicate(hb) returns true. System workers are excluded
295+
// unless includeSystemWorkers is true.
284296
func (m *registryImpl) filterWorkers(
285297
nsID namespace.ID,
298+
includeSystemWorkers bool,
286299
predicate func(*workerpb.WorkerHeartbeat) bool,
287300
) []*workerpb.WorkerHeartbeat {
288301
b := m.getBucket(nsID)
289302

290303
if b == nil {
291304
return nil
292305
}
293-
return b.filterWorkers(nsID, predicate)
306+
return b.filterWorkers(nsID, includeSystemWorkers, predicate)
294307
}
295308

296309
// evictLoop periodically triggers TTL and capacity-based eviction.
@@ -384,7 +397,7 @@ func (m *registryImpl) ListWorkers(nsID namespace.ID, params ListWorkersParams)
384397
}
385398

386399
// Get all matching workers and paginate
387-
workers := m.filterWorkers(nsID, predicate)
400+
workers := m.filterWorkers(nsID, params.IncludeSystemWorkers, predicate)
388401
return paginateWorkers(workers, params.PageSize, params.NextPageToken)
389402
}
390403

service/matching/workers/registry_impl_test.go

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
1112
enumspb "go.temporal.io/api/enums/v1"
1213
workerpb "go.temporal.io/api/worker/v1"
1314
"go.temporal.io/server/common/dynamicconfig"
@@ -40,15 +41,15 @@ func TestUpdateAndListNamespace(t *testing.T) {
4041
defer m.Stop()
4142

4243
// No entries initially
43-
list := m.filterWorkers("ns1", alwaysTrue)
44+
list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue)
4445
assert.Empty(t, list, "expected empty list before updates")
4546

4647
// Add some heartbeats
4748
hb1 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerA", Status: enumspb.WORKER_STATUS_RUNNING}
4849
hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerB", Status: enumspb.WORKER_STATUS_RUNNING}
4950
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})
5051

51-
list = m.filterWorkers("ns1", alwaysTrue)
52+
list = m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue)
5253
// Order is not guaranteed; check contents by keys
5354
keys := []string{list[0].WorkerInstanceKey, list[1].WorkerInstanceKey}
5455
assert.Contains(t, keys, "workerA")
@@ -94,7 +95,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
9495
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})
9596

9697
// Verify both workers are registered
97-
list := m.filterWorkers("ns1", alwaysTrue)
98+
list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue)
9899
assert.Len(t, list, 2, "both workers should be registered")
99100
assert.Equal(t, int64(2), m.total.Load(), "total should be 2")
100101

@@ -103,7 +104,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
103104
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hbShutdown})
104105

105106
// Verify only worker1 is removed, worker2 remains
106-
list = m.filterWorkers("ns1", alwaysTrue)
107+
list = m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue)
107108
assert.Len(t, list, 1, "only one worker should remain")
108109
assert.Equal(t, "worker2", list[0].WorkerInstanceKey, "worker2 should remain")
109110
assert.Equal(t, int64(1), m.total.Load(), "total should be 1 after shutdown")
@@ -141,7 +142,7 @@ func TestShutdownStatusForNonExistentWorker(t *testing.T) {
141142
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb})
142143

143144
// Verify nothing happened
144-
list := m.filterWorkers("ns1", alwaysTrue)
145+
list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue)
145146
assert.Empty(t, list, "no workers should exist")
146147
assert.Zero(t, m.total.Load(), "total should remain 0")
147148
}
@@ -180,12 +181,36 @@ func TestListNamespacePredicate(t *testing.T) {
180181

181182
for _, tc := range tests {
182183
t.Run(tc.name, func(t *testing.T) {
183-
list := m.filterWorkers("ns", tc.pred)
184+
list := m.filterWorkers("ns", true /* includeSystemWorkers */, tc.pred)
184185
assert.Len(t, list, tc.wantCount)
185186
})
186187
}
187188
}
188189

190+
func TestFilterWorkersExcludesSystemWorkers(t *testing.T) {
191+
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
192+
defer m.Stop()
193+
194+
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{
195+
{WorkerInstanceKey: "user-1", TaskQueue: "my-queue"},
196+
{WorkerInstanceKey: "user-2", TaskQueue: "my-queue"},
197+
{WorkerInstanceKey: "sys-1", TaskQueue: "temporal-sys-per-ns-tq"},
198+
})
199+
200+
t.Run("includeSystemWorkers=true returns all", func(t *testing.T) {
201+
list := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue)
202+
require.Len(t, list, 3)
203+
})
204+
205+
t.Run("includeSystemWorkers=false excludes system workers", func(t *testing.T) {
206+
list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue)
207+
require.Len(t, list, 2)
208+
for _, w := range list {
209+
require.NotEqual(t, "sys-1", w.WorkerInstanceKey)
210+
}
211+
})
212+
}
213+
189214
func TestEvictByTTL(t *testing.T) {
190215
// Use capture handler to verify metrics
191216
captureHandler := metricstest.NewCaptureHandler()
@@ -216,7 +241,7 @@ func TestEvictByTTL(t *testing.T) {
216241
// Perform eviction
217242
m.evictByTTL()
218243

219-
list := m.filterWorkers("ns", alwaysTrue)
244+
list := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue)
220245
assert.Empty(t, list, "entry should be evicted by TTL")
221246
assert.Zero(t, m.total.Load(), "total counter should be decremented")
222247

@@ -261,7 +286,7 @@ func TestEvictByCapacity(t *testing.T) {
261286
m.evictByCapacity()
262287

263288
// Ensure we evicted down to maxItems
264-
remaining := m.filterWorkers("ns", alwaysTrue)
289+
remaining := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue)
265290
assert.Len(t, remaining, int(maxItems), "should evict down to maxItems")
266291
assert.LessOrEqual(t, m.total.Load(), int64(maxItems), "total counter should not exceed maxItems")
267292

@@ -323,7 +348,7 @@ func TestEvictByCapacityWithMinAgeProtection(t *testing.T) {
323348
m.evictByCapacity()
324349

325350
// All entries should still be there (protected by minEvictAge)
326-
workers := m.filterWorkers("ns", alwaysTrue)
351+
workers := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue)
327352
assert.Len(t, workers, 3, "all entries should be protected by minEvictAge")
328353
assert.Equal(t, int64(3), m.total.Load(), "should still exceed maxItems due to protection")
329354

@@ -374,7 +399,7 @@ func TestEvictByCapacityAfterMinAge(t *testing.T) {
374399
m.evictByCapacity()
375400

376401
// Should have evicted down to maxItems
377-
workers := m.filterWorkers("ns", alwaysTrue)
402+
workers := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue)
378403
assert.LessOrEqual(t, len(workers), int(maxItems), "should evict down to maxItems")
379404
assert.LessOrEqual(t, m.total.Load(), int64(maxItems), "total should be within limits")
380405

@@ -424,10 +449,10 @@ func TestMultipleNamespaces(t *testing.T) {
424449
m.upsertHeartbeats("namespace2", ns2Workers)
425450

426451
// Verify functional behavior first
427-
ns1List := m.filterWorkers("namespace1", alwaysTrue)
452+
ns1List := m.filterWorkers("namespace1", true /* includeSystemWorkers */, alwaysTrue)
428453
assert.Len(t, ns1List, 3, "namespace1 should have 3 workers")
429454

430-
ns2List := m.filterWorkers("namespace2", alwaysTrue)
455+
ns2List := m.filterWorkers("namespace2", true /* includeSystemWorkers */, alwaysTrue)
431456
assert.Len(t, ns2List, 2, "namespace2 should have 2 workers")
432457

433458
assert.Equal(t, int64(5), m.total.Load(), "total should be 5 workers across namespaces")
@@ -548,7 +573,7 @@ func BenchmarkListNamespace(b *testing.B) {
548573
}
549574
b.ResetTimer()
550575
for i := 0; i < b.N; i++ {
551-
_ = m.filterWorkers("benchNs", alwaysTrue)
576+
_ = m.filterWorkers("benchNs", true /* includeSystemWorkers */, alwaysTrue)
552577
}
553578
}
554579

service/matching/workers/registry_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,57 @@ func TestRegistryImpl_ListWorkersInvalidPageToken(t *testing.T) {
554554
assert.Contains(t, err.Error(), "invalid next_page_token")
555555
}
556556

557+
func TestRegistryImpl_ListWorkersExcludesSystemWorkers(t *testing.T) {
558+
r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
559+
560+
// Add workers on a user task queue and a system (internal) task queue.
561+
r.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{
562+
{WorkerInstanceKey: "user-worker-1", TaskQueue: "my-queue"},
563+
{WorkerInstanceKey: "user-worker-2", TaskQueue: "my-queue"},
564+
{WorkerInstanceKey: "sys-worker-1", TaskQueue: "temporal-sys-per-ns-tq"},
565+
})
566+
567+
t.Run("excludes system workers by default", func(t *testing.T) {
568+
resp, err := r.ListWorkers("ns1", ListWorkersParams{})
569+
require.NoError(t, err)
570+
require.Len(t, resp.Workers, 2, "should only return user workers")
571+
572+
workerKeys := make([]string, len(resp.Workers))
573+
for i, w := range resp.Workers {
574+
workerKeys[i] = w.WorkerInstanceKey
575+
}
576+
require.ElementsMatch(t, []string{"user-worker-1", "user-worker-2"}, workerKeys)
577+
})
578+
579+
t.Run("includes system workers when requested", func(t *testing.T) {
580+
resp, err := r.ListWorkers("ns1", ListWorkersParams{IncludeSystemWorkers: true})
581+
require.NoError(t, err)
582+
require.Len(t, resp.Workers, 3, "should return all workers including system")
583+
584+
workerKeys := make([]string, len(resp.Workers))
585+
for i, w := range resp.Workers {
586+
workerKeys[i] = w.WorkerInstanceKey
587+
}
588+
require.ElementsMatch(t, []string{"user-worker-1", "user-worker-2", "sys-worker-1"}, workerKeys)
589+
})
590+
591+
t.Run("pagination excludes system workers from page counts", func(t *testing.T) {
592+
// Page 1 (sorted: "user-worker-1" comes first)
593+
resp1, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 1})
594+
require.NoError(t, err)
595+
require.Len(t, resp1.Workers, 1)
596+
require.Equal(t, "user-worker-1", resp1.Workers[0].WorkerInstanceKey)
597+
require.NotNil(t, resp1.NextPageToken)
598+
599+
// Page 2
600+
resp2, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 1, NextPageToken: resp1.NextPageToken})
601+
require.NoError(t, err)
602+
require.Len(t, resp2.Workers, 1)
603+
require.Equal(t, "user-worker-2", resp2.Workers[0].WorkerInstanceKey)
604+
require.Nil(t, resp2.NextPageToken)
605+
})
606+
}
607+
557608
func TestRegistryImpl_RecordStorageDriverMetric(t *testing.T) {
558609
t.Run("disabled when ExternalPayloadsEnabled is false", func(t *testing.T) {
559610
captureHandler := metricstest.NewCaptureHandler()

0 commit comments

Comments
 (0)