Skip to content

Commit 5cce93d

Browse files
rkannan82claude
andcommitted
Store isSystemWorker at upsert time instead of checking task queue name at query time
Moves the system worker classification to the entry level so it's computed once during heartbeat recording rather than on every ListWorkers call. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 769af51 commit 5cce93d

2 files changed

Lines changed: 36 additions & 34 deletions

File tree

service/matching/workers/registry_impl.go

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ type listWorkersPageToken struct {
3030
type (
3131
// entry wraps a WorkerHeartbeat along with its namespace and eviction metadata.
3232
entry struct {
33-
nsID namespace.ID
34-
hb *workerpb.WorkerHeartbeat
35-
lastSeen time.Time
36-
elem *list.Element
33+
nsID namespace.ID
34+
hb *workerpb.WorkerHeartbeat
35+
lastSeen time.Time
36+
elem *list.Element
37+
isSystemWorker bool
3738
}
3839
// bucket holds part of the keyspace: a map from namespace → (map of instanceKey → entry),
3940
// plus a recency list for eviction.
@@ -106,16 +107,20 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
106107
continue
107108
}
108109

110+
isSystem := primitives.IsInternalTaskQueue(hb.GetTaskQueue())
111+
109112
// Normal upsert
110113
if e, exists := mp[key]; exists {
111114
e.hb = hb
112115
e.lastSeen = now
116+
e.isSystemWorker = isSystem
113117
b.order.MoveToBack(e.elem)
114118
} else {
115119
e = &entry{
116-
nsID: nsID,
117-
hb: hb,
118-
lastSeen: now,
120+
nsID: nsID,
121+
hb: hb,
122+
lastSeen: now,
123+
isSystemWorker: isSystem,
119124
}
120125
e.elem = b.order.PushBack(e)
121126
mp[key] = e
@@ -127,9 +132,11 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
127132
}
128133

129134
// filterWorkers returns all WorkerHeartbeats in a namespace
130-
// for which predicate(hb) returns true.
135+
// for which predicate(hb) returns true. System workers are excluded
136+
// unless includeSystemWorkers is true.
131137
func (b *bucket) filterWorkers(
132138
nsID namespace.ID,
139+
includeSystemWorkers bool,
133140
predicate func(*workerpb.WorkerHeartbeat) bool,
134141
) []*workerpb.WorkerHeartbeat {
135142
b.mu.Lock()
@@ -141,6 +148,9 @@ func (b *bucket) filterWorkers(
141148
}
142149
out := make([]*workerpb.WorkerHeartbeat, 0, len(mp))
143150
for _, e := range mp {
151+
if !includeSystemWorkers && e.isSystemWorker {
152+
continue
153+
}
144154
if predicate(e.hb) {
145155
out = append(out, e.hb)
146156
}
@@ -281,17 +291,19 @@ func (m *registryImpl) recordEvictionMetric() {
281291
}
282292

283293
// filterWorkers returns all WorkerHeartbeats in a namespace
284-
// for which predicate(hb) returns true.
294+
// for which predicate(hb) returns true. System workers are excluded
295+
// unless includeSystemWorkers is true.
285296
func (m *registryImpl) filterWorkers(
286297
nsID namespace.ID,
298+
includeSystemWorkers bool,
287299
predicate func(*workerpb.WorkerHeartbeat) bool,
288300
) []*workerpb.WorkerHeartbeat {
289301
b := m.getBucket(nsID)
290302

291303
if b == nil {
292304
return nil
293305
}
294-
return b.filterWorkers(nsID, predicate)
306+
return b.filterWorkers(nsID, includeSystemWorkers, predicate)
295307
}
296308

297309
// evictLoop periodically triggers TTL and capacity-based eviction.
@@ -384,18 +396,8 @@ func (m *registryImpl) ListWorkers(nsID namespace.ID, params ListWorkersParams)
384396
}
385397
}
386398

387-
if !params.IncludeSystemWorkers {
388-
basePredicate := predicate
389-
predicate = func(hb *workerpb.WorkerHeartbeat) bool {
390-
if primitives.IsInternalTaskQueue(hb.GetTaskQueue()) {
391-
return false
392-
}
393-
return basePredicate(hb)
394-
}
395-
}
396-
397399
// Get all matching workers and paginate
398-
workers := m.filterWorkers(nsID, predicate)
400+
workers := m.filterWorkers(nsID, params.IncludeSystemWorkers, predicate)
399401
return paginateWorkers(workers, params.PageSize, params.NextPageToken)
400402
}
401403

service/matching/workers/registry_impl_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,15 @@ func TestUpdateAndListNamespace(t *testing.T) {
4040
defer m.Stop()
4141

4242
// No entries initially
43-
list := m.filterWorkers("ns1", alwaysTrue)
43+
list := m.filterWorkers("ns1", true, alwaysTrue)
4444
assert.Empty(t, list, "expected empty list before updates")
4545

4646
// Add some heartbeats
4747
hb1 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerA", Status: enumspb.WORKER_STATUS_RUNNING}
4848
hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerB", Status: enumspb.WORKER_STATUS_RUNNING}
4949
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})
5050

51-
list = m.filterWorkers("ns1", alwaysTrue)
51+
list = m.filterWorkers("ns1", true, alwaysTrue)
5252
// Order is not guaranteed; check contents by keys
5353
keys := []string{list[0].WorkerInstanceKey, list[1].WorkerInstanceKey}
5454
assert.Contains(t, keys, "workerA")
@@ -94,7 +94,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
9494
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})
9595

9696
// Verify both workers are registered
97-
list := m.filterWorkers("ns1", alwaysTrue)
97+
list := m.filterWorkers("ns1", true, alwaysTrue)
9898
assert.Len(t, list, 2, "both workers should be registered")
9999
assert.Equal(t, int64(2), m.total.Load(), "total should be 2")
100100

@@ -103,7 +103,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
103103
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hbShutdown})
104104

105105
// Verify only worker1 is removed, worker2 remains
106-
list = m.filterWorkers("ns1", alwaysTrue)
106+
list = m.filterWorkers("ns1", true, alwaysTrue)
107107
assert.Len(t, list, 1, "only one worker should remain")
108108
assert.Equal(t, "worker2", list[0].WorkerInstanceKey, "worker2 should remain")
109109
assert.Equal(t, int64(1), m.total.Load(), "total should be 1 after shutdown")
@@ -141,7 +141,7 @@ func TestShutdownStatusForNonExistentWorker(t *testing.T) {
141141
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb})
142142

143143
// Verify nothing happened
144-
list := m.filterWorkers("ns1", alwaysTrue)
144+
list := m.filterWorkers("ns1", true, alwaysTrue)
145145
assert.Empty(t, list, "no workers should exist")
146146
assert.Zero(t, m.total.Load(), "total should remain 0")
147147
}
@@ -180,7 +180,7 @@ func TestListNamespacePredicate(t *testing.T) {
180180

181181
for _, tc := range tests {
182182
t.Run(tc.name, func(t *testing.T) {
183-
list := m.filterWorkers("ns", tc.pred)
183+
list := m.filterWorkers("ns", true, tc.pred)
184184
assert.Len(t, list, tc.wantCount)
185185
})
186186
}
@@ -216,7 +216,7 @@ func TestEvictByTTL(t *testing.T) {
216216
// Perform eviction
217217
m.evictByTTL()
218218

219-
list := m.filterWorkers("ns", alwaysTrue)
219+
list := m.filterWorkers("ns", true, alwaysTrue)
220220
assert.Empty(t, list, "entry should be evicted by TTL")
221221
assert.Zero(t, m.total.Load(), "total counter should be decremented")
222222

@@ -261,7 +261,7 @@ func TestEvictByCapacity(t *testing.T) {
261261
m.evictByCapacity()
262262

263263
// Ensure we evicted down to maxItems
264-
remaining := m.filterWorkers("ns", alwaysTrue)
264+
remaining := m.filterWorkers("ns", true, alwaysTrue)
265265
assert.Len(t, remaining, int(maxItems), "should evict down to maxItems")
266266
assert.LessOrEqual(t, m.total.Load(), int64(maxItems), "total counter should not exceed maxItems")
267267

@@ -323,7 +323,7 @@ func TestEvictByCapacityWithMinAgeProtection(t *testing.T) {
323323
m.evictByCapacity()
324324

325325
// All entries should still be there (protected by minEvictAge)
326-
workers := m.filterWorkers("ns", alwaysTrue)
326+
workers := m.filterWorkers("ns", true, alwaysTrue)
327327
assert.Len(t, workers, 3, "all entries should be protected by minEvictAge")
328328
assert.Equal(t, int64(3), m.total.Load(), "should still exceed maxItems due to protection")
329329

@@ -374,7 +374,7 @@ func TestEvictByCapacityAfterMinAge(t *testing.T) {
374374
m.evictByCapacity()
375375

376376
// Should have evicted down to maxItems
377-
workers := m.filterWorkers("ns", alwaysTrue)
377+
workers := m.filterWorkers("ns", true, alwaysTrue)
378378
assert.LessOrEqual(t, len(workers), int(maxItems), "should evict down to maxItems")
379379
assert.LessOrEqual(t, m.total.Load(), int64(maxItems), "total should be within limits")
380380

@@ -424,10 +424,10 @@ func TestMultipleNamespaces(t *testing.T) {
424424
m.upsertHeartbeats("namespace2", ns2Workers)
425425

426426
// Verify functional behavior first
427-
ns1List := m.filterWorkers("namespace1", alwaysTrue)
427+
ns1List := m.filterWorkers("namespace1", true, alwaysTrue)
428428
assert.Len(t, ns1List, 3, "namespace1 should have 3 workers")
429429

430-
ns2List := m.filterWorkers("namespace2", alwaysTrue)
430+
ns2List := m.filterWorkers("namespace2", true, alwaysTrue)
431431
assert.Len(t, ns2List, 2, "namespace2 should have 2 workers")
432432

433433
assert.Equal(t, int64(5), m.total.Load(), "total should be 5 workers across namespaces")
@@ -548,7 +548,7 @@ func BenchmarkListNamespace(b *testing.B) {
548548
}
549549
b.ResetTimer()
550550
for i := 0; i < b.N; i++ {
551-
_ = m.filterWorkers("benchNs", alwaysTrue)
551+
_ = m.filterWorkers("benchNs", true, alwaysTrue)
552552
}
553553
}
554554

0 commit comments

Comments
 (0)