Skip to content

Commit 0613a63

Browse files
committed
feat: optimize logic
1 parent a52d08e commit 0613a63

1 file changed

Lines changed: 118 additions & 58 deletions

File tree

src/server/modules/socket-io/socket-io.ts

Lines changed: 118 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ interface MediaRoomPeer {
3333
}
3434

3535
interface BroadcasterStoreInfo {
36-
status: 'ready' | 'producing' | 'disconnected';
36+
status: 'ready' | 'broadcasting';
37+
broadcastingTrackIds: string[];
3738
}
3839

3940
interface BroadcasterStoreTracks {
@@ -109,14 +110,8 @@ export default class SocketIOServer {
109110

110111
socket.on('disconnect', async (reason) => {
111112
console.log(`[socket.disconnect] [${alias}:${userId}] broadcaster:`, broadcasterId, reason);
112-
// TODO clear
113-
const info = getRedisJsonResp<BroadcasterStoreInfo>(
114-
await this.redis.get(`${this.getBroadcasterRoomKey(alias, userId)}:info`),
115-
);
116-
if (info) {
117-
info.status = 'disconnected';
118-
await this.redis.set(`${this.getBroadcasterRoomKey(alias, userId)}:info`, JSON.stringify(info));
119-
}
113+
// 粗暴但有效的做法,一旦推流方断连就清空所有,下次需要重新走一套流程
114+
await this.clearRoomAndAllData(alias, userId);
120115
});
121116

122117
registerSocketEvent(socket, 'getContestInfo', async () => {
@@ -136,13 +131,17 @@ export default class SocketIOServer {
136131
};
137132
});
138133

139-
registerSocketEvent(socket, 'confirmReady', async (data) => {
134+
/**
135+
* confirmReady: 确认准备就绪,服务端创建 media room 并创建 transport
136+
*/
137+
registerSocketEvent(socket, 'confirmReady', async (data: { tracks: BroadcasterStoreTracks[] }) => {
140138
console.log(`[socket.confirmReady] [${alias}:${userId}] data:`, data);
141139
socket.join(this.getBroadcasterRoomKey(alias, userId));
142140
await this.redis.set(
143141
`${this.getBroadcasterRoomKey(alias, userId)}:info`,
144142
JSON.stringify({
145143
status: 'ready',
144+
broadcastingTrackIds: [],
146145
}),
147146
);
148147
await this.redis.set(`${this.getBroadcasterRoomKey(alias, userId)}:tracks`, JSON.stringify(data.tracks));
@@ -158,60 +157,47 @@ export default class SocketIOServer {
158157
transport,
159158
trackProducers: new Map(),
160159
};
161-
room.broadcaster = broadcasterPeer;
162160
room.peers.set(broadcasterId, broadcasterPeer);
161+
room.broadcaster = broadcasterPeer; // alias to peers[broadcasterId]
163162
this.mediaRooms.set(roomKey, room);
164163
console.log(`[socket.confirmReady] [${alias}:${userId}] created media room: ${roomKey}`);
165164
console.log(`[socket.confirmReady] [${alias}:${userId}] joined broadcaster: ${broadcasterId}`);
166165

167-
// TODO remove this temp trigger requestStartBroadcast
168-
setTimeout(async () => {
169-
const info = getRedisJsonResp<BroadcasterStoreInfo>(
170-
await this.redis.get(`${this.getBroadcasterRoomKey(alias, userId)}:info`),
171-
);
172-
if (!info || info.status !== 'ready') {
173-
return;
174-
}
175-
const tracks = getRedisJsonResp<BroadcasterStoreTracks[]>(
176-
await this.redis.get(`${this.getBroadcasterRoomKey(alias, userId)}:tracks`),
177-
);
178-
if (!tracks || tracks.length === 0) {
179-
return;
180-
}
181-
console.log(
182-
`[socket.confirmReady] [${alias}:${userId}] temp trigger requestStartBroadcast:`,
183-
tracks.map((track: any) => track.trackId),
184-
);
185-
socket.emit('requestStartBroadcast', {
186-
trackIds: tracks.map((track: any) => track.trackId),
187-
transport: {
188-
id: transport.id,
189-
iceParameters: transport.iceParameters,
190-
iceCandidates: transport.iceCandidates,
191-
dtlsParameters: transport.dtlsParameters,
192-
},
193-
routerRtpCapabilities: this.mediasoupRouter.rtpCapabilities,
194-
});
195-
}, 2000);
166+
// temp trigger requestStartBroadcast
167+
// setTimeout(async () => {
168+
// const info = getRedisJsonResp<BroadcasterStoreInfo>(
169+
// await this.redis.get(`${this.getBroadcasterRoomKey(alias, userId)}:info`),
170+
// );
171+
// if (!info || info.status !== 'ready') {
172+
// return;
173+
// }
174+
// const tracks = getRedisJsonResp<BroadcasterStoreTracks[]>(
175+
// await this.redis.get(`${this.getBroadcasterRoomKey(alias, userId)}:tracks`),
176+
// );
177+
// if (!tracks || tracks.length === 0) {
178+
// return;
179+
// }
180+
// console.log(
181+
// `[socket.confirmReady] [${alias}:${userId}] temp trigger requestStartBroadcast:`,
182+
// tracks.map((track: any) => track.trackId),
183+
// );
184+
// socket.emit('requestStartBroadcast', {
185+
// trackIds: tracks.map((track: any) => track.trackId),
186+
// transport: {
187+
// id: transport.id,
188+
// iceParameters: transport.iceParameters,
189+
// iceCandidates: transport.iceCandidates,
190+
// dtlsParameters: transport.dtlsParameters,
191+
// },
192+
// routerRtpCapabilities: this.mediasoupRouter.rtpCapabilities,
193+
// });
194+
// }, 2000);
196195
});
197196

198197
registerSocketEvent(socket, 'cancelReady', async () => {
199198
console.log(`[socket.cancelReady] [${alias}:${userId}]`);
200199
socket.leave(this.getBroadcasterRoomKey(alias, userId));
201-
await this.redis.del(`${this.getBroadcasterRoomKey(alias, userId)}:info`);
202-
await this.redis.del(`${this.getBroadcasterRoomKey(alias, userId)}:tracks`);
203-
204-
const room = this.mediaRooms.get(this.getMediaRoomKey(alias, userId));
205-
if (room) {
206-
room.peers.forEach((peer) => {
207-
peer.trackProducers?.forEach((producer) => {
208-
producer.close();
209-
});
210-
peer.transport.close();
211-
});
212-
room.peers.clear();
213-
this.mediaRooms.delete(this.getMediaRoomKey(alias, userId));
214-
}
200+
await this.clearRoomAndAllData(alias, userId);
215201
});
216202

217203
registerSocketEvent(socket, 'completeConnectTransport', async (data: { dtlsParameters: DtlsParameters }) => {
@@ -258,8 +244,18 @@ export default class SocketIOServer {
258244
});
259245
console.log(`[socket.produce] [${alias}:${userId}] produced track:`, producer.id);
260246
peer.trackProducers?.set(data.trackId, producer);
247+
const info = getRedisJsonResp<BroadcasterStoreInfo>(
248+
await this.redis.get(`${this.getBroadcasterRoomKey(alias, userId)}:info`),
249+
);
250+
if (info) {
251+
info.status = 'broadcasting';
252+
info.broadcastingTrackIds.push(data.trackId);
253+
await this.redis.set(`${this.getBroadcasterRoomKey(alias, userId)}:info`, JSON.stringify(info));
254+
}
261255
return {
262256
producerId: producer.id,
257+
type: producer.type,
258+
appData: producer.appData,
263259
};
264260
},
265261
);
@@ -287,11 +283,19 @@ export default class SocketIOServer {
287283
console.log(`[socket.connection] [${alias}:${userId}] viewer:`, viewerId);
288284
socket.join(this.getViewerRoomKey(alias, userId));
289285

286+
/**
287+
* 断连后此 viewer 相关 peer 信息和 transport 都会被清理,需要重新 joinBroadcastRoom
288+
*/
290289
socket.on('disconnect', () => {
291290
console.log(`[socket.disconnect] [${alias}:${userId}] viewer:`, viewerId);
291+
const mediaRoom = this.mediaRooms.get(this.getMediaRoomKey(alias, userId));
292+
const peer = mediaRoom?.peers.get(viewerId);
293+
peer?.transport.close();
294+
mediaRoom?.peers.delete(viewerId);
295+
mediaRoom?.viewers.delete(viewerId);
292296
});
293297

294-
registerSocketEvent(socket, 'requestBroadcast', async (data: { trackIds: string[] }) => {
298+
registerSocketEvent(socket, 'startBroadcast', async (data: { trackIds: string[] }) => {
295299
const info = getRedisJsonResp<BroadcasterStoreInfo>(
296300
await this.redis.get(`${this.getBroadcasterRoomKey(alias, userId)}:info`),
297301
);
@@ -316,9 +320,12 @@ export default class SocketIOServer {
316320
const availableTracks = data.trackIds.filter((trackId) => {
317321
return tracks.some((track: any) => track.trackId === trackId);
318322
});
319-
console.log(`[socket.requestBroadcast] [${alias}:${userId}] tracks:`, availableTracks);
323+
console.log(`[socket.startBroadcast] [${alias}:${userId}] tracks:`, availableTracks);
320324
if (availableTracks.length > 0) {
321-
socket.emit('requestStartBroadcast', {
325+
console.log(
326+
`[socket.emit.requestStartBroadcast] [${alias}:${userId}] requesting start broadcast to broadcaster`,
327+
);
328+
this.broadcasterNsp.to(this.getBroadcasterRoomKey(alias, userId)).emit('requestStartBroadcast', {
322329
trackIds: availableTracks,
323330
transport: {
324331
id: broadcasterPeer.transport.id,
@@ -344,7 +351,8 @@ export default class SocketIOServer {
344351
const viewerPeer: MediaRoomPeer = {
345352
transport,
346353
};
347-
mediaRoom.viewers.set(viewerId, viewerPeer);
354+
mediaRoom.peers.set(viewerId, viewerPeer);
355+
mediaRoom.viewers.set(viewerId, viewerPeer); // alias to peers[viewerId]
348356
console.log(`[socket.joinBroadcastRoom] [${alias}:${userId}] joined viewer:`, viewerId);
349357
return {
350358
transport: {
@@ -424,9 +432,42 @@ export default class SocketIOServer {
424432
console.log(`[socket.consume] [${alias}:${userId}] consumed track:`, consumer.id);
425433
return {
426434
consumerId: consumer.id,
435+
producerId: producer.id,
436+
kind: consumer.kind,
437+
rtpParameters: consumer.rtpParameters,
438+
type: consumer.type,
439+
producerPaused: consumer.producerPaused,
440+
appData: consumer.appData,
427441
};
428442
},
429443
);
444+
445+
registerSocketEvent(socket, 'stopBroadcast', async () => {
446+
console.log(`[socket.stopBroadcast] [${alias}:${userId}]`);
447+
const mediaRoom = this.mediaRooms.get(this.getMediaRoomKey(alias, userId));
448+
if (!mediaRoom) {
449+
throw new LogicException(ErrCode.BroadcastMediaRoomBroken);
450+
}
451+
this.broadcasterNsp.to(this.getBroadcasterRoomKey(alias, userId)).emit('requestStopBroadcast', async () => {
452+
console.log(
453+
`[socket.emit.requestStopBroadcast] [${alias}:${userId}] received broadcaster ack, cleaning up producers`,
454+
);
455+
// 仅清理 producers 相关,不关闭 transport
456+
const info = getRedisJsonResp<BroadcasterStoreInfo>(
457+
await this.redis.get(`${this.getBroadcasterRoomKey(alias, userId)}:info`),
458+
);
459+
if (info) {
460+
info.status = 'ready';
461+
info.broadcastingTrackIds = [];
462+
await this.redis.set(`${this.getBroadcasterRoomKey(alias, userId)}:info`, JSON.stringify(info));
463+
}
464+
mediaRoom.broadcaster?.trackProducers?.forEach((producer) => {
465+
producer.close();
466+
});
467+
mediaRoom.broadcaster?.trackProducers?.clear();
468+
this.viewerNsp.to(this.getViewerRoomKey(alias, userId)).emit('broadcastStopped');
469+
});
470+
});
430471
});
431472
}
432473

@@ -450,6 +491,25 @@ export default class SocketIOServer {
450491
};
451492
return room;
452493
}
494+
495+
private async clearRoomAndAllData(alias: string, userId: string) {
496+
await Promise.all([
497+
this.redis.del(`${this.getBroadcasterRoomKey(alias, userId)}:info`),
498+
this.redis.del(`${this.getBroadcasterRoomKey(alias, userId)}:tracks`),
499+
]);
500+
501+
const room = this.mediaRooms.get(this.getMediaRoomKey(alias, userId));
502+
if (room) {
503+
room.broadcaster?.trackProducers?.forEach((producer) => {
504+
producer.close();
505+
});
506+
room.peers.forEach((peer) => {
507+
peer.transport.close();
508+
});
509+
room.peers.clear();
510+
this.mediaRooms.delete(this.getMediaRoomKey(alias, userId));
511+
}
512+
}
453513
}
454514

455515
function handleError(e: any) {

0 commit comments

Comments
 (0)