-
-
Notifications
You must be signed in to change notification settings - Fork 8.3k
Expand file tree
/
Copy pathio-adapter.ts
More file actions
98 lines (90 loc) · 2.97 KB
/
io-adapter.ts
File metadata and controls
98 lines (90 loc) · 2.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import { isFunction, isNil } from '@nestjs/common/utils/shared.utils';
import {
AbstractWsAdapter,
MessageMappingProperties,
} from '@nestjs/websockets';
import { DISCONNECT_EVENT } from '@nestjs/websockets/constants';
import { fromEvent, Observable } from 'rxjs';
import { filter, first, map, mergeMap, share, takeUntil } from 'rxjs/operators';
import { Server, ServerOptions, Socket } from 'socket.io';
/**
* @publicApi
*/
export class IoAdapter extends AbstractWsAdapter {
private readonly disconnectMap = new WeakMap<Socket, Observable<any>>();
public create(
port: number,
options?: ServerOptions & { namespace?: string; server?: any },
): Server {
if (!options) {
return this.createIOServer(port);
}
const { namespace, server, ...opt } = options;
return server && isFunction(server.of)
? server.of(namespace)
: namespace
? this.createIOServer(port, opt).of(namespace)
: this.createIOServer(port, opt);
}
public createIOServer(port: number, options?: any): any {
if (this.httpServer && port === 0) {
return new Server(this.httpServer, options);
}
return new Server(port, options);
}
public bindMessageHandlers(
socket: Socket,
handlers: MessageMappingProperties[],
transform: (data: any) => Observable<any>,
) {
let disconnect$ = this.disconnectMap.get(socket);
if (!disconnect$) {
disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(share(), first());
this.disconnectMap.set(socket, disconnect$);
}
handlers.forEach(({ message, callback, isAckHandledManually }) => {
const source$ = fromEvent(socket, message).pipe(
mergeMap((payload: any) => {
const { data, ack } = this.mapPayload(payload);
return transform(callback(data, ack)).pipe(
filter((response: any) => !isNil(response)),
map((response: any) => [response, ack, isAckHandledManually]),
);
}),
takeUntil(disconnect$),
);
source$.subscribe(([response, ack, isAckHandledManually]) => {
if (response.event) {
return socket.emit(response.event, response.data);
}
if (!isAckHandledManually && isFunction(ack)) {
ack(response);
}
});
});
}
public mapPayload(payload: unknown): { data: any; ack?: Function } {
if (!Array.isArray(payload)) {
if (isFunction(payload)) {
return { data: undefined, ack: payload };
}
return { data: payload };
}
const lastElement = payload[payload.length - 1];
const isAck = isFunction(lastElement);
if (isAck) {
const size = payload.length - 1;
return {
data: size === 1 ? payload[0] : payload.slice(0, size),
ack: lastElement,
};
}
return { data: payload };
}
public async close(server: Server): Promise<void> {
if (this.forceCloseConnections && server.httpServer === this.httpServer) {
return;
}
return super.close(server);
}
}