Skip to content

Commit 4495de9

Browse files
authored
Fix poller reduce logic for pollers with different identities (#3057)
* Fix poller reduce for different identities * Check if identity exists
1 parent f8b3869 commit 4495de9

2 files changed

Lines changed: 201 additions & 105 deletions

File tree

src/lib/utilities/reduce-poller-types.test.ts

Lines changed: 154 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ describe('reducePollerTypes', () => {
3737
});
3838
});
3939

40-
it('should combine activity and workflow pollers', () => {
40+
it('should combine activity and workflow pollers with same identity', () => {
4141
const result = reducePollerTypes({
4242
activityPollers: {
4343
pollers: [{ identity: '1', lastAccessTime: 2 }],
@@ -57,14 +57,14 @@ describe('reducePollerTypes', () => {
5757
pollers: [
5858
{
5959
identity: '1',
60-
lastAccessTime: 2,
60+
lastAccessTime: 3,
6161
taskQueueTypes: ['WORKFLOW', 'ACTIVITY'],
6262
},
6363
],
6464
});
6565
});
6666

67-
it('should combine nexus and workflow pollers', () => {
67+
it('should combine nexus and workflow pollers with same identity', () => {
6868
const result = reducePollerTypes({
6969
activityPollers: {
7070
versioningInfo,
@@ -84,14 +84,14 @@ describe('reducePollerTypes', () => {
8484
pollers: [
8585
{
8686
identity: '1',
87-
lastAccessTime: 2,
87+
lastAccessTime: 3,
8888
taskQueueTypes: ['WORKFLOW', 'NEXUS'],
8989
},
9090
],
9191
});
9292
});
9393

94-
it('should combine all poller types', () => {
94+
it('should combine all poller types with same identity', () => {
9595
const result = reducePollerTypes({
9696
activityPollers: {
9797
pollers: [{ identity: '1', lastAccessTime: 2 }],
@@ -112,7 +112,7 @@ describe('reducePollerTypes', () => {
112112
pollers: [
113113
{
114114
identity: '1',
115-
lastAccessTime: 2,
115+
lastAccessTime: 4,
116116
taskQueueTypes: ['WORKFLOW', 'ACTIVITY', 'NEXUS'],
117117
},
118118
],
@@ -131,4 +131,152 @@ describe('reducePollerTypes', () => {
131131
pollers: [],
132132
});
133133
});
134+
135+
it('should return separate workers when identities differ', () => {
136+
const result = reducePollerTypes({
137+
workflowPollers: {
138+
pollers: [
139+
{
140+
identity: '124@Mac.local.net',
141+
lastAccessTime: '2025-12-10T15:55:38.544049618Z',
142+
},
143+
],
144+
versioningInfo,
145+
},
146+
activityPollers: {
147+
pollers: [
148+
{
149+
identity: '345@Mac.local.net',
150+
lastAccessTime: '2025-12-10T15:55:39.802086504Z',
151+
},
152+
],
153+
versioningInfo,
154+
},
155+
nexusPollers: {
156+
pollers: [
157+
{
158+
identity: '678@Mac.local.net',
159+
lastAccessTime: '2025-12-10T15:55:40.000000000Z',
160+
},
161+
],
162+
versioningInfo,
163+
},
164+
});
165+
expect(result.pollers).toHaveLength(3);
166+
expect(result.pollers).toContainEqual({
167+
identity: '124@Mac.local.net',
168+
lastAccessTime: '2025-12-10T15:55:38.544049618Z',
169+
taskQueueTypes: ['WORKFLOW'],
170+
});
171+
expect(result.pollers).toContainEqual({
172+
identity: '345@Mac.local.net',
173+
lastAccessTime: '2025-12-10T15:55:39.802086504Z',
174+
taskQueueTypes: ['ACTIVITY'],
175+
});
176+
expect(result.pollers).toContainEqual({
177+
identity: '678@Mac.local.net',
178+
lastAccessTime: '2025-12-10T15:55:40.000000000Z',
179+
taskQueueTypes: ['NEXUS'],
180+
});
181+
});
182+
183+
it('should handle mix of shared and different identities', () => {
184+
const result = reducePollerTypes({
185+
workflowPollers: {
186+
pollers: [
187+
{ identity: 'shared@host', lastAccessTime: 1 },
188+
{ identity: 'workflow-only@host', lastAccessTime: 2 },
189+
],
190+
versioningInfo,
191+
},
192+
activityPollers: {
193+
pollers: [
194+
{ identity: 'shared@host', lastAccessTime: 3 },
195+
{ identity: 'activity-only@host', lastAccessTime: 4 },
196+
],
197+
versioningInfo,
198+
},
199+
nexusPollers: {
200+
pollers: [
201+
{ identity: 'shared@host', lastAccessTime: 5 },
202+
{ identity: 'nexus-only@host', lastAccessTime: 6 },
203+
],
204+
versioningInfo,
205+
},
206+
});
207+
expect(result.pollers).toHaveLength(4);
208+
expect(result.pollers).toContainEqual({
209+
identity: 'shared@host',
210+
lastAccessTime: 5,
211+
taskQueueTypes: ['WORKFLOW', 'ACTIVITY', 'NEXUS'],
212+
});
213+
expect(result.pollers).toContainEqual({
214+
identity: 'workflow-only@host',
215+
lastAccessTime: 2,
216+
taskQueueTypes: ['WORKFLOW'],
217+
});
218+
expect(result.pollers).toContainEqual({
219+
identity: 'activity-only@host',
220+
lastAccessTime: 4,
221+
taskQueueTypes: ['ACTIVITY'],
222+
});
223+
expect(result.pollers).toContainEqual({
224+
identity: 'nexus-only@host',
225+
lastAccessTime: 6,
226+
taskQueueTypes: ['NEXUS'],
227+
});
228+
});
229+
230+
it('should return only nexus pollers if only nexus pollers', () => {
231+
const result = reducePollerTypes({
232+
workflowPollers: {
233+
versioningInfo,
234+
},
235+
activityPollers: {
236+
versioningInfo,
237+
},
238+
nexusPollers: {
239+
pollers: [{ identity: '1', lastAccessTime: 1 }],
240+
versioningInfo,
241+
},
242+
});
243+
expect(result).toEqual({
244+
versioningInfo,
245+
taskQueueStatus,
246+
pollers: [
247+
{
248+
identity: '1',
249+
lastAccessTime: 1,
250+
taskQueueTypes: ['NEXUS'],
251+
},
252+
],
253+
});
254+
});
255+
256+
it('should combine nexus and activity pollers with same identity', () => {
257+
const result = reducePollerTypes({
258+
workflowPollers: {
259+
versioningInfo,
260+
},
261+
activityPollers: {
262+
pollers: [{ identity: '1', lastAccessTime: 2 }],
263+
versioningInfo,
264+
},
265+
nexusPollers: {
266+
pollers: [{ identity: '1', lastAccessTime: 3 }],
267+
versioningInfo,
268+
},
269+
});
270+
expect(result).toEqual({
271+
versioningInfo,
272+
taskQueueStatus,
273+
pollers: [
274+
{
275+
identity: '1',
276+
lastAccessTime: 3,
277+
taskQueueTypes: ['ACTIVITY', 'NEXUS'],
278+
},
279+
],
280+
});
281+
});
134282
});

src/lib/utilities/reduce-poller-types.ts

Lines changed: 47 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
import type {
2-
Poller,
32
PollerWithTaskQueueTypes,
43
TaskQueueType,
54
} from '$lib/services/pollers-service';
65
import type { PollerInfo, TaskQueueResponse } from '$lib/types';
76

8-
type PollersData = {
9-
[key: string]: Poller;
10-
};
11-
127
export const reducePollerTypes = ({
138
activityPollers,
149
nexusPollers,
@@ -18,102 +13,55 @@ export const reducePollerTypes = ({
1813
nexusPollers: TaskQueueResponse;
1914
workflowPollers: TaskQueueResponse;
2015
}): TaskQueueResponse => {
21-
if (!workflowPollers?.pollers) workflowPollers.pollers = [];
22-
if (!activityPollers?.pollers) activityPollers.pollers = [];
23-
if (!nexusPollers?.pollers) nexusPollers.pollers = [];
24-
25-
activityPollers.pollers.forEach((poller: PollerWithTaskQueueTypes) => {
26-
poller.taskQueueTypes = ['ACTIVITY'];
27-
});
28-
29-
workflowPollers.pollers.forEach((poller: PollerWithTaskQueueTypes) => {
30-
poller.taskQueueTypes = ['WORKFLOW'];
31-
});
32-
33-
nexusPollers.pollers.forEach((poller: PollerWithTaskQueueTypes) => {
34-
poller.taskQueueTypes = ['NEXUS'];
35-
});
36-
37-
const r =
38-
(type: TaskQueueType) => (pollers: PollersData, poller: PollerInfo) => {
39-
const currentPoller: Poller = pollers[poller.identity] || {
40-
lastAccessTime: undefined,
41-
taskQueueTypes: [],
42-
};
43-
44-
pollers[poller.identity] = {
45-
lastAccessTime:
46-
!currentPoller.lastAccessTime ||
47-
currentPoller.lastAccessTime < poller.lastAccessTime
48-
? poller.lastAccessTime
49-
: currentPoller.lastAccessTime,
50-
taskQueueTypes: currentPoller.taskQueueTypes.concat([type]),
51-
};
52-
53-
return pollers;
54-
};
55-
56-
activityPollers.pollers.filter((pollerA: PollerWithTaskQueueTypes) =>
57-
workflowPollers.pollers.some((pollerW: PollerWithTaskQueueTypes) => {
58-
if (pollerA.identity === pollerW.identity) {
59-
pollerA.taskQueueTypes = [
60-
...pollerW.taskQueueTypes,
61-
...pollerA.taskQueueTypes,
62-
];
63-
return pollerA;
16+
const workflowPollersList = workflowPollers?.pollers ?? [];
17+
const activityPollersList = activityPollers?.pollers ?? [];
18+
const nexusPollersList = nexusPollers?.pollers ?? [];
19+
20+
const pollerMap = new Map<
21+
string,
22+
{
23+
poller: PollerInfo;
24+
taskQueueTypes: TaskQueueType[];
25+
}
26+
>();
27+
28+
const addPoller = (poller: PollerInfo, type: TaskQueueType) => {
29+
if (poller.identity) {
30+
const existing = pollerMap.get(poller.identity);
31+
if (existing) {
32+
existing.taskQueueTypes.push(type);
33+
if (poller?.lastAccessTime > existing?.poller?.lastAccessTime) {
34+
existing.poller = poller;
35+
}
36+
} else {
37+
pollerMap.set(poller.identity, {
38+
poller,
39+
taskQueueTypes: [type],
40+
});
6441
}
65-
}),
66-
);
67-
68-
activityPollers.pollers.filter((pollerA: PollerWithTaskQueueTypes) =>
69-
nexusPollers.pollers.some((pollerN: PollerWithTaskQueueTypes) => {
70-
if (pollerN.identity === pollerA.identity) {
71-
pollerA.taskQueueTypes = [
72-
...pollerA.taskQueueTypes,
73-
...pollerN.taskQueueTypes,
74-
];
75-
return pollerA;
76-
}
77-
}),
78-
);
79-
80-
nexusPollers.pollers.filter((pollerN: PollerWithTaskQueueTypes) =>
81-
workflowPollers.pollers.some((pollerW: PollerWithTaskQueueTypes) => {
82-
if (pollerN.identity === pollerW.identity) {
83-
pollerN.taskQueueTypes = [
84-
...pollerW.taskQueueTypes,
85-
...pollerN.taskQueueTypes,
86-
];
87-
return pollerN;
88-
}
89-
}),
90-
);
91-
92-
activityPollers.pollers?.reduce(
93-
r('ACTIVITY'),
94-
nexusPollers.pollers?.reduce(
95-
r('NEXUS'),
96-
workflowPollers.pollers.reduce(r('WORKFLOW'), {}),
97-
),
98-
);
99-
100-
const pollers = activityPollers.pollers.length
101-
? activityPollers.pollers
102-
: !nexusPollers.pollers.length
103-
? workflowPollers.pollers
104-
: nexusPollers.pollers;
105-
106-
const taskQueueStatus = activityPollers.pollers.length
107-
? nexusPollers.taskQueueStatus
108-
: !nexusPollers.pollers.length
109-
? workflowPollers.taskQueueStatus
110-
: nexusPollers.taskQueueStatus;
42+
}
43+
};
11144

112-
const versioningInfo = activityPollers.pollers.length
113-
? nexusPollers.versioningInfo
114-
: !nexusPollers.pollers.length
115-
? workflowPollers.versioningInfo
116-
: nexusPollers.versioningInfo;
45+
workflowPollersList.forEach((poller) => addPoller(poller, 'WORKFLOW'));
46+
activityPollersList.forEach((poller) => addPoller(poller, 'ACTIVITY'));
47+
nexusPollersList.forEach((poller) => addPoller(poller, 'NEXUS'));
48+
49+
const pollers: PollerWithTaskQueueTypes[] = Array.from(
50+
pollerMap.values(),
51+
).map(({ poller, taskQueueTypes }) => ({
52+
...poller,
53+
taskQueueTypes,
54+
}));
55+
56+
const versioningInfo =
57+
activityPollers?.versioningInfo ??
58+
workflowPollers?.versioningInfo ??
59+
nexusPollers?.versioningInfo;
60+
61+
const taskQueueStatus =
62+
activityPollers?.taskQueueStatus ??
63+
workflowPollers?.taskQueueStatus ??
64+
nexusPollers?.taskQueueStatus;
11765

11866
return {
11967
pollers,

0 commit comments

Comments
 (0)