Skip to content

Commit 1fc4655

Browse files
authored
refactor(onebot): 精简 WebSocket 适配器实现 (#1644)
* refactor(onebot): 移除 `async-mutex` 依赖 * fix(onebot): 避免重复发送 WebSocket pong
1 parent eb07cdb commit 1fc4655

7 files changed

Lines changed: 69 additions & 134 deletions

File tree

packages/napcat-onebot/network/http-server.ts

Lines changed: 32 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@ import { URL } from 'url';
1313
import { ActionName } from '@/napcat-onebot/action/router';
1414
import { OB11HeartbeatEvent } from '@/napcat-onebot/event/meta/OB11HeartbeatEvent';
1515
import { OB11LifeCycleEvent, LifeCycleSubType } from '@/napcat-onebot/event/meta/OB11LifeCycleEvent';
16-
import { Mutex } from 'async-mutex';
1716

1817
export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig> {
1918
private app: Express | undefined;
2019
private server: http.Server | undefined;
2120
private wsServer?: WebSocketServer;
2221
private wsClients: WebSocket[] = [];
23-
private wsClientsMutex = new Mutex();
2422
private heartbeatIntervalId: NodeJS.Timeout | null = null;
2523
private wsClientWithEvent: WebSocket[] = [];
2624

@@ -30,19 +28,17 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
3028

3129
override async onEvent<T extends OB11EmitEventContent> (event: T) {
3230
// http server is passive, no need to emit event
33-
this.wsClientsMutex.runExclusive(async () => {
34-
const promises = this.wsClientWithEvent.map((wsClient) => {
35-
return new Promise<void>((resolve, reject) => {
36-
if (wsClient.readyState === WebSocket.OPEN) {
37-
wsClient.send(JSON.stringify(event));
38-
resolve();
39-
} else {
40-
reject(new Error('WebSocket is not open'));
41-
}
42-
});
31+
const promises = this.wsClientWithEvent.map((wsClient) => {
32+
return new Promise<void>((resolve, reject) => {
33+
if (wsClient.readyState === WebSocket.OPEN) {
34+
wsClient.send(JSON.stringify(event));
35+
resolve();
36+
} else {
37+
reject(new Error('WebSocket is not open'));
38+
}
4339
});
44-
await Promise.allSettled(promises);
4540
});
41+
await Promise.allSettled(promises);
4642
}
4743

4844
open () {
@@ -65,13 +61,9 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
6561
this.server?.close();
6662
this.app = undefined;
6763
this.stopHeartbeat();
68-
await this.wsClientsMutex.runExclusive(async () => {
69-
this.wsClients.forEach((wsClient) => {
70-
wsClient.close();
71-
});
72-
this.wsClients = [];
73-
this.wsClientWithEvent = [];
74-
});
64+
this.wsClients.forEach((wsClient) => wsClient.close());
65+
this.wsClients = [];
66+
this.wsClientWithEvent = [];
7567
this.wsServer?.close();
7668
}
7769

@@ -153,36 +145,29 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
153145
wsClient.on('message', (message) => {
154146
this.handleWSMessage(wsClient, message).then().catch(e => this.logger.logError(e));
155147
});
156-
wsClient.on('ping', () => {
157-
wsClient.pong();
158-
});
159148
wsClient.on('pong', () => {
160149
// this.logger.logDebug('[OneBot] [HTTP WebSocket] Pong received');
161150
});
162151
wsClient.once('close', () => {
163-
this.wsClientsMutex.runExclusive(async () => {
164-
const NormolIndex = this.wsClients.indexOf(wsClient);
165-
if (NormolIndex !== -1) {
166-
this.wsClients.splice(NormolIndex, 1);
167-
}
168-
const EventIndex = this.wsClientWithEvent.indexOf(wsClient);
169-
if (EventIndex !== -1) {
170-
this.wsClientWithEvent.splice(EventIndex, 1);
171-
}
172-
if (this.wsClientWithEvent.length === 0) {
173-
this.stopHeartbeat();
174-
}
175-
});
176-
});
177-
await this.wsClientsMutex.runExclusive(async () => {
178-
if (!isApiConnect) {
179-
this.wsClientWithEvent.push(wsClient);
152+
const NormolIndex = this.wsClients.indexOf(wsClient);
153+
if (NormolIndex !== -1) {
154+
this.wsClients.splice(NormolIndex, 1);
180155
}
181-
this.wsClients.push(wsClient);
182-
if (this.wsClientWithEvent.length > 0) {
183-
this.startHeartbeat();
156+
const EventIndex = this.wsClientWithEvent.indexOf(wsClient);
157+
if (EventIndex !== -1) {
158+
this.wsClientWithEvent.splice(EventIndex, 1);
159+
}
160+
if (this.wsClientWithEvent.length === 0) {
161+
this.stopHeartbeat();
184162
}
185163
});
164+
if (!isApiConnect) {
165+
this.wsClientWithEvent.push(wsClient);
166+
}
167+
this.wsClients.push(wsClient);
168+
if (this.wsClientWithEvent.length > 0) {
169+
this.startHeartbeat();
170+
}
186171
}).on('error', (err) => this.logger.log('[OneBot] [HTTP WebSocket] Server Error:', err.message));
187172
}
188173

@@ -197,12 +182,10 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
197182
private startHeartbeat () {
198183
if (this.heartbeatIntervalId) return;
199184
this.heartbeatIntervalId = setInterval(() => {
200-
this.wsClientsMutex.runExclusive(async () => {
201-
this.wsClientWithEvent.forEach((wsClient) => {
202-
if (wsClient.readyState === WebSocket.OPEN) {
203-
wsClient.send(JSON.stringify(new OB11HeartbeatEvent(this.core, 30000, this.core.selfInfo.online ?? true, true)));
204-
}
205-
});
185+
this.wsClientWithEvent.forEach((wsClient) => {
186+
if (wsClient.readyState === WebSocket.OPEN) {
187+
wsClient.send(JSON.stringify(new OB11HeartbeatEvent(this.core, 30000, this.core.selfInfo.online ?? true, true)));
188+
}
206189
});
207190
}, 30000);
208191
}

packages/napcat-onebot/network/websocket-client.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,6 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter<WebsocketCli
8585
},
8686

8787
});
88-
this.connection.on('ping', () => {
89-
this.connection?.pong();
90-
});
9188
this.connection.on('pong', () => {
9289
// this.logger.logDebug('[OneBot] [WebSocket Client] 收到pong');
9390
});

packages/napcat-onebot/network/websocket-server.ts

Lines changed: 32 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { OB11EmitEventContent, OB11NetworkReloadType } from './index';
22
import { URL } from 'url';
33
import { RawData, WebSocket, WebSocketServer } from 'ws';
4-
import { Mutex } from 'async-mutex';
54
import { OB11Response } from '@/napcat-onebot/action/OneBotAction';
65
import { ActionName } from '@/napcat-onebot/action/router';
76
import { NapCatCore } from 'napcat-core';
@@ -17,7 +16,6 @@ import json5 from 'json5';
1716
export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketServerConfig> {
1817
wsServer?: WebSocketServer;
1918
wsClients: WebSocket[] = [];
20-
wsClientsMutex = new Mutex();
2119
private heartbeatIntervalId: NodeJS.Timeout | null = null;
2220
wsClientWithEvent: WebSocket[] = [];
2321

@@ -58,36 +56,29 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
5856
wsClient.on('message', (message) => {
5957
this.handleMessage(wsClient, message).then().catch(e => this.logger.logError(e));
6058
});
61-
wsClient.on('ping', () => {
62-
wsClient.pong();
63-
});
6459
wsClient.on('pong', () => {
6560
// this.logger.logDebug('[OneBot] [WebSocket Server] Pong received');
6661
});
6762
wsClient.once('close', () => {
68-
this.wsClientsMutex.runExclusive(async () => {
69-
const NormolIndex = this.wsClients.indexOf(wsClient);
70-
if (NormolIndex !== -1) {
71-
this.wsClients.splice(NormolIndex, 1);
72-
}
73-
const EventIndex = this.wsClientWithEvent.indexOf(wsClient);
74-
if (EventIndex !== -1) {
75-
this.wsClientWithEvent.splice(EventIndex, 1);
76-
}
77-
if (this.wsClientWithEvent.length === 0) {
78-
this.stopHeartbeat();
79-
}
80-
});
81-
});
82-
await this.wsClientsMutex.runExclusive(async () => {
83-
if (!isApiConnect) {
84-
this.wsClientWithEvent.push(wsClient);
63+
const NormolIndex = this.wsClients.indexOf(wsClient);
64+
if (NormolIndex !== -1) {
65+
this.wsClients.splice(NormolIndex, 1);
66+
}
67+
const EventIndex = this.wsClientWithEvent.indexOf(wsClient);
68+
if (EventIndex !== -1) {
69+
this.wsClientWithEvent.splice(EventIndex, 1);
8570
}
86-
this.wsClients.push(wsClient);
87-
if (this.wsClientWithEvent.length > 0) {
88-
this.startHeartbeat();
71+
if (this.wsClientWithEvent.length === 0) {
72+
this.stopHeartbeat();
8973
}
9074
});
75+
if (!isApiConnect) {
76+
this.wsClientWithEvent.push(wsClient);
77+
}
78+
this.wsClients.push(wsClient);
79+
if (this.wsClientWithEvent.length > 0) {
80+
this.startHeartbeat();
81+
}
9182
}).on('error', (err) => this.logger.log('[OneBot] [WebSocket Server] Server Error:', err.message));
9283
}
9384

@@ -100,19 +91,17 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
10091
}
10192

10293
async onEvent<T extends OB11EmitEventContent> (event: T) {
103-
this.wsClientsMutex.runExclusive(async () => {
104-
const promises = this.wsClientWithEvent.map((wsClient) => {
105-
return new Promise<void>((resolve, reject) => {
106-
if (wsClient.readyState === WebSocket.OPEN) {
107-
wsClient.send(JSON.stringify(event));
108-
resolve();
109-
} else {
110-
reject(new Error('WebSocket is not open'));
111-
}
112-
});
94+
const promises = this.wsClientWithEvent.map((wsClient) => {
95+
return new Promise<void>((resolve, reject) => {
96+
if (wsClient.readyState === WebSocket.OPEN) {
97+
wsClient.send(JSON.stringify(event));
98+
resolve();
99+
} else {
100+
reject(new Error('WebSocket is not open'));
101+
}
113102
});
114-
await Promise.allSettled(promises);
115103
});
104+
await Promise.allSettled(promises);
116105
}
117106

118107
open () {
@@ -136,24 +125,18 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
136125
}
137126
});
138127
this.stopHeartbeat();
139-
await this.wsClientsMutex.runExclusive(async () => {
140-
this.wsClients.forEach((wsClient) => {
141-
wsClient.close();
142-
});
143-
this.wsClients = [];
144-
this.wsClientWithEvent = [];
145-
});
128+
this.wsClients.forEach((wsClient) => wsClient.close());
129+
this.wsClients = [];
130+
this.wsClientWithEvent = [];
146131
}
147132

148133
private startHeartbeat () {
149134
if (this.heartbeatIntervalId || this.config.heartInterval <= 0) return;
150135
this.heartbeatIntervalId = setInterval(() => {
151-
this.wsClientsMutex.runExclusive(async () => {
152-
this.wsClientWithEvent.forEach((wsClient) => {
153-
if (wsClient.readyState === WebSocket.OPEN) {
154-
wsClient.send(JSON.stringify(new OB11HeartbeatEvent(this.core, this.config.heartInterval, this.core.selfInfo.online ?? true, true)));
155-
}
156-
});
136+
this.wsClientWithEvent.forEach((wsClient) => {
137+
if (wsClient.readyState === WebSocket.OPEN) {
138+
wsClient.send(JSON.stringify(new OB11HeartbeatEvent(this.core, this.config.heartInterval, this.core.selfInfo.online ?? true, true)));
139+
}
157140
});
158141
}, this.config.heartInterval);
159142
}

packages/napcat-onebot/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
"express": "^5.0.0",
2727
"ws": "^8.18.3",
2828
"file-type": "^21.0.0",
29-
"async-mutex": "^0.5.0",
3029
"napcat-protobuf": "workspace:*",
3130
"json5": "^2.2.3",
3231
"napcat-core": "workspace:*",

packages/napcat-types/external-shims.d.ts

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,6 @@ declare module 'yaml' {
6161
export const stringify: (...args: any[]) => any;
6262
}
6363

64-
declare module 'async-mutex' {
65-
export class Mutex {
66-
acquire (): Promise<() => void>;
67-
runExclusive<T> (callback: () => T | Promise<T>): Promise<T>;
68-
}
69-
export class Semaphore {
70-
acquire (): Promise<[() => void, number]>;
71-
runExclusive<T> (callback: () => T | Promise<T>): Promise<T>;
72-
release (): void;
73-
}
74-
const _async_mutex_default: { Mutex: typeof Mutex; Semaphore: typeof Semaphore; };
75-
export default _async_mutex_default;
76-
}
77-
7864
declare module 'napcat-protobuf' {
7965
export class NapProtoMsg<T = any> {
8066
constructor (schema: any);

packages/napcat-types/scripts/post-build.mjs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ const EXTERNAL_TYPE_REPLACEMENTS = {
3535
'ValidateFunction<T>': 'any',
3636
// inversify
3737
'Container': 'any',
38-
// async-mutex
39-
'Mutex': 'any',
40-
'Semaphore': 'any',
4138
// napcat-protobuf
4239
'NapProtoDecodeStructType': 'any',
4340
'NapProtoEncodeStructType': 'any',
@@ -90,15 +87,15 @@ function replaceExternalTypes (content) {
9087
// 使用类型上下文的模式匹配
9188
const typeContextPatterns = [
9289
// : Type
93-
/:\s*(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[;,)\]\}|&]|$)/g,
90+
/:\s*(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[;,)\]\}|&]|$)/g,
9491
// <Type>
95-
/<(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)>/g,
92+
/<(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)>/g,
9693
// Type[]
97-
/(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)\[\]/g,
94+
/(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)\[\]/g,
9895
// extends Type
99-
/extends\s+(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[{,])/g,
96+
/extends\s+(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[{,])/g,
10097
// implements Type
101-
/implements\s+(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[{,])/g,
98+
/implements\s+(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[{,])/g,
10299
];
103100

104101
for (const pattern of typeContextPatterns) {

pnpm-lock.yaml

Lines changed: 0 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)