Skip to content

Commit 41929ea

Browse files
authored
feat(runtime): resubscribe websocket topics (#6)
1 parent c4fce01 commit 41929ea

4 files changed

Lines changed: 184 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ English | [中文](./CHANGELOG.zh-CN.md)
44

55
## [Unreleased]
66

7+
- feat(runtime): resubscribe Socket.IO runtime page topics after reconnect with replay cursor support.
78
- feat(runtime): add a Socket.IO WebSocket manager for BFF runtime page subscriptions.

CHANGELOG.zh-CN.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@
44

55
## [Unreleased]
66

7+
- feat(runtime): Socket.IO runtime page topic 重连后自动重订阅,并支持携带 replay cursor。
78
- feat(runtime): 新增用于 BFF runtime page 订阅的 Socket.IO WebSocket manager。

projects/runtime-adapter/src/lib/runtime-adapter.spec.ts

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ describe('createSocketIoWebSocketManager', () => {
207207
expect(socket.connectCalls).toBe(1);
208208
expect(socket.disconnectCalls).toBe(1);
209209
expect(socket.listenerCount('runtimeManagerExecuted')).toBe(0);
210+
expect(socket.listenerCount('connect')).toBe(0);
210211
});
211212

212213
it('subscribes to BFF runtime page topics without cursor', async () => {
@@ -239,6 +240,62 @@ describe('createSocketIoWebSocketManager', () => {
239240
]);
240241
});
241242

243+
it('resubscribes active topics after socket reconnect', async () => {
244+
const socket = new FakeRuntimeSocket();
245+
const manager = createSocketIoWebSocketManager({
246+
socketFactory: () => socket,
247+
});
248+
const topic = 'tenant.tenant-a.page.orders.instance.instance-1';
249+
250+
await manager.connect();
251+
await manager.subscribe(topic, vi.fn());
252+
socket.receive('connect', undefined);
253+
254+
expect(socket.emits).toEqual([
255+
{
256+
event: 'subscribePage',
257+
payload: {
258+
tenantId: 'tenant-a',
259+
pageId: 'orders',
260+
pageInstanceId: 'instance-1',
261+
},
262+
},
263+
{
264+
event: 'subscribePage',
265+
payload: {
266+
tenantId: 'tenant-a',
267+
pageId: 'orders',
268+
pageInstanceId: 'instance-1',
269+
},
270+
},
271+
]);
272+
});
273+
274+
it('resubscribes a topic once when multiple handlers share it', async () => {
275+
const socket = new FakeRuntimeSocket();
276+
const manager = createSocketIoWebSocketManager({
277+
socketFactory: () => socket,
278+
});
279+
const topic = 'tenant.tenant-a.page.orders.instance.instance-1';
280+
281+
await manager.connect();
282+
await manager.subscribe(topic, vi.fn());
283+
await manager.subscribe(topic, vi.fn());
284+
socket.emits.length = 0;
285+
socket.receive('connect', undefined);
286+
287+
expect(socket.emits).toEqual([
288+
{
289+
event: 'subscribePage',
290+
payload: {
291+
tenantId: 'tenant-a',
292+
pageId: 'orders',
293+
pageInstanceId: 'instance-1',
294+
},
295+
},
296+
]);
297+
});
298+
242299
it('subscribes with replay cursor options', async () => {
243300
const socket = new FakeRuntimeSocket();
244301
const manager = createSocketIoWebSocketManager({
@@ -269,6 +326,76 @@ describe('createSocketIoWebSocketManager', () => {
269326
]);
270327
});
271328

329+
it('uses the latest replay cursor when resubscribing after reconnect', async () => {
330+
const socket = new FakeRuntimeSocket();
331+
const manager = createSocketIoWebSocketManager({
332+
socketFactory: () => socket,
333+
});
334+
const topic = 'tenant.tenant-a.page.orders.instance.instance-1';
335+
const handler = vi.fn();
336+
337+
await subscribeWithOptions(manager, topic, handler, {
338+
afterReplayId: '42-0',
339+
});
340+
socket.receive('runtimeManagerExecuted', {
341+
type: 'runtime.manager.executed',
342+
topic,
343+
page: {
344+
tenantId: 'tenant-a',
345+
pageId: 'orders',
346+
pageInstanceId: 'instance-1',
347+
},
348+
replayId: '43-0',
349+
patchState: {},
350+
refreshedDatasourceIds: [],
351+
runActionIds: [],
352+
});
353+
socket.emits.length = 0;
354+
socket.receive('connect', undefined);
355+
356+
expect(socket.emits).toEqual([
357+
{
358+
event: 'subscribePage',
359+
payload: {
360+
tenantId: 'tenant-a',
361+
pageId: 'orders',
362+
pageInstanceId: 'instance-1',
363+
afterReplayId: '43-0',
364+
},
365+
},
366+
]);
367+
});
368+
369+
it('falls back to the original replay cursor when no newer event was received', async () => {
370+
const socket = new FakeRuntimeSocket();
371+
const manager = createSocketIoWebSocketManager({
372+
socketFactory: () => socket,
373+
});
374+
375+
await subscribeWithOptions(
376+
manager,
377+
'tenant.tenant-a.page.orders.instance.instance-1',
378+
vi.fn(),
379+
{
380+
afterReplayId: '42-0',
381+
},
382+
);
383+
socket.emits.length = 0;
384+
socket.receive('connect', undefined);
385+
386+
expect(socket.emits).toEqual([
387+
{
388+
event: 'subscribePage',
389+
payload: {
390+
tenantId: 'tenant-a',
391+
pageId: 'orders',
392+
pageInstanceId: 'instance-1',
393+
afterReplayId: '42-0',
394+
},
395+
},
396+
]);
397+
});
398+
272399
it('dispatches manager events only to matching topic handlers', async () => {
273400
const socket = new FakeRuntimeSocket();
274401
const manager = createSocketIoWebSocketManager({
@@ -309,6 +436,8 @@ describe('createSocketIoWebSocketManager', () => {
309436
await manager.connect();
310437
await manager.subscribe(topic, handler);
311438
await manager.unsubscribe(topic, handler);
439+
socket.emits.length = 0;
440+
socket.receive('connect', undefined);
312441
socket.receive('runtimeManagerExecuted', {
313442
type: 'runtime.manager.executed',
314443
topic,
@@ -323,6 +452,26 @@ describe('createSocketIoWebSocketManager', () => {
323452
});
324453

325454
expect(handler).not.toHaveBeenCalled();
455+
expect(socket.emits).toEqual([]);
456+
});
457+
458+
it('clears subscriptions and replay cursors on disconnect', async () => {
459+
const socket = new FakeRuntimeSocket();
460+
const manager = createSocketIoWebSocketManager({
461+
socketFactory: () => socket,
462+
});
463+
const topic = 'tenant.tenant-a.page.orders.instance.instance-1';
464+
465+
await subscribeWithOptions(manager, topic, vi.fn(), {
466+
afterReplayId: '42-0',
467+
});
468+
await manager.disconnect();
469+
socket.emits.length = 0;
470+
socket.receive('connect', undefined);
471+
472+
expect(socket.emits).toEqual([]);
473+
expect(socket.listenerCount('runtimeManagerExecuted')).toBe(0);
474+
expect(socket.listenerCount('connect')).toBe(0);
326475
});
327476
});
328477

projects/runtime-adapter/src/lib/runtime-adapter.ts

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,8 @@ export function createSocketIoWebSocketManager(
269269
options: SocketIoWebSocketManagerOptions = {},
270270
): NgxLowcodeWebSocketManager {
271271
const subscriptions = new Map<string, Set<NgxLowcodeWebSocketEventHandler>>();
272+
const originalReplayIdsByTopic = new Map<string, string>();
273+
const latestReplayIdsByTopic = new Map<string, string>();
272274
const socketUrl = resolveSocketUrl(options);
273275
const socketOptions: Partial<ManagerOptions & SocketOptions> = {
274276
autoConnect: false,
@@ -283,6 +285,10 @@ export function createSocketIoWebSocketManager(
283285
return;
284286
}
285287

288+
if (event.replayId) {
289+
latestReplayIdsByTopic.set(event.topic, event.replayId);
290+
}
291+
286292
const handlers = subscriptions.get(event.topic);
287293
if (!handlers) {
288294
return;
@@ -293,6 +299,12 @@ export function createSocketIoWebSocketManager(
293299
}
294300
};
295301

302+
const handleConnect = (): void => {
303+
for (const channel of subscriptions.keys()) {
304+
emitSubscribePage(channel);
305+
}
306+
};
307+
296308
return {
297309
connect: () => {
298310
getSocket().connect();
@@ -304,25 +316,31 @@ export function createSocketIoWebSocketManager(
304316
) => {
305317
const page = parseRuntimePageTopic(channel);
306318
const handlers = subscriptions.get(channel) ?? new Set<NgxLowcodeWebSocketEventHandler>();
319+
const isFirstTopicHandler = handlers.size === 0;
307320
handlers.add(handler);
308321
subscriptions.set(channel, handlers);
309-
getSocket().emit('subscribePage', {
310-
...page,
311-
...(subscribeOptions?.afterReplayId
312-
? { afterReplayId: subscribeOptions.afterReplayId }
313-
: {}),
314-
});
322+
if (subscribeOptions?.afterReplayId) {
323+
originalReplayIdsByTopic.set(channel, subscribeOptions.afterReplayId);
324+
}
325+
if (isFirstTopicHandler) {
326+
emitSubscribePage(channel, page);
327+
}
315328
},
316329
unsubscribe: (channel: string, handler: NgxLowcodeWebSocketEventHandler) => {
317330
const handlers = subscriptions.get(channel);
318331
handlers?.delete(handler);
319332
if (handlers && handlers.size === 0) {
320333
subscriptions.delete(channel);
334+
originalReplayIdsByTopic.delete(channel);
335+
latestReplayIdsByTopic.delete(channel);
321336
}
322337
},
323338
disconnect: () => {
324339
subscriptions.clear();
340+
originalReplayIdsByTopic.clear();
341+
latestReplayIdsByTopic.clear();
325342
socket?.off('runtimeManagerExecuted', handleManagerExecuted);
343+
socket?.off('connect', handleConnect);
326344
socket?.disconnect();
327345
socket = undefined;
328346
listenerAttached = false;
@@ -333,10 +351,19 @@ export function createSocketIoWebSocketManager(
333351
socket ??= socketFactory(socketUrl, socketOptions);
334352
if (!listenerAttached) {
335353
socket.on('runtimeManagerExecuted', handleManagerExecuted);
354+
socket.on('connect', handleConnect);
336355
listenerAttached = true;
337356
}
338357
return socket;
339358
}
359+
360+
function emitSubscribePage(channel: string, page = parseRuntimePageTopic(channel)): void {
361+
const replayId = latestReplayIdsByTopic.get(channel) ?? originalReplayIdsByTopic.get(channel);
362+
getSocket().emit('subscribePage', {
363+
...page,
364+
...(replayId ? { afterReplayId: replayId } : {}),
365+
});
366+
}
340367
}
341368

342369
export function parseRuntimePageTopic(topic: string): RuntimePageTopicRef {

0 commit comments

Comments
 (0)