Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions packages/voice/__tests__/VoiceReceiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ describe('VoiceReceiver', () => {
expect(stream.read()).toEqual(RTP_PACKET.opusFrame);
});

test.each([
['Desktop', RTP_PACKET_DESKTOP, 10_217, 4_157_324_497],
['Chrome', RTP_PACKET_CHROME, 18_143, 660_155_095],
['Android', RTP_PACKET_ANDROID, 14_800, 3_763_991_879],
])('onUdpMessage: RTP header metadata from %s', async (_testName, RTP_PACKET, expectedSeq, expectedTs) => {
receiver['decrypt'] = vitest.fn().mockImplementationOnce(() => RTP_PACKET.decrypted);

const spy = vitest.spyOn(receiver.ssrcMap, 'get');
spy.mockImplementation(() => ({
audioSSRC: RTP_PACKET.ssrc,
userId: '123',
}));

const stream = receiver.subscribe('123');

receiver['onUdpMessage'](RTP_PACKET.packet);
await nextTick();
const packet = stream.read();
expect(packet.sequence).toEqual(expectedSeq);
expect(packet.timestamp).toEqual(expectedTs);
expect(packet.ssrc).toEqual(RTP_PACKET.ssrc);
});

test('onUdpMessage: AudioPacket is backwards compatible', async () => {
receiver['decrypt'] = vitest.fn().mockImplementationOnce(() => RTP_PACKET_DESKTOP.decrypted);

const spy = vitest.spyOn(receiver.ssrcMap, 'get');
spy.mockImplementation(() => ({
audioSSRC: RTP_PACKET_DESKTOP.ssrc,
userId: '123',
}));

const stream = receiver.subscribe('123');

receiver['onUdpMessage'](RTP_PACKET_DESKTOP.packet);
await nextTick();
const packet = stream.read();
expect(Buffer.isBuffer(packet)).toBe(true);
expect(packet).toEqual(RTP_PACKET_DESKTOP.opusFrame);
});

test('onUdpMessage: <8 bytes packet', () => {
expect(() => receiver['onUdpMessage'](Buffer.alloc(4))).not.toThrow();
});
Expand Down
22 changes: 22 additions & 0 deletions packages/voice/src/receive/AudioReceiveStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,28 @@ export interface AudioReceiveStreamOptions extends ReadableOptions {
end: EndBehavior;
}

/**
* A Buffer containing encoded Opus packet data and key RTP Header metadata.
*/
export interface AudioPacket extends Buffer {
/**
* The RTP sequence number of this packet (16-bit, wraps at 65535).
*/
readonly sequence: number;

/**
* The synchronization source identifier for this packet (32-bit).
* A change in SSRC indicates a new RTP stream, so any associated
* decoder should be reset.
*/
readonly ssrc: number;

/**
* The RTP timestamp of this packet (32-bit, wraps at 2^32 - 1).
*/
readonly timestamp: number;
}

export function createDefaultAudioReceiveStreamOptions(): AudioReceiveStreamOptions {
return {
end: {
Expand Down
34 changes: 30 additions & 4 deletions packages/voice/src/receive/VoiceReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { methods } from '../util/Secretbox';
import {
AudioReceiveStream,
createDefaultAudioReceiveStreamOptions,
type AudioPacket,
type AudioReceiveStreamOptions,
} from './AudioReceiveStream';
import { SSRCMap } from './SSRCMap';
Expand Down Expand Up @@ -131,9 +132,14 @@ export class VoiceReceiver {
* @param nonce - The nonce buffer used by the connection for encryption
* @param secretKey - The secret key used by the connection for encryption
* @param userId - The user id that sent the packet
* @param ssrc - already-parsed SSRC (Synchronization Source Identifier) from the RTP Header
* @returns The parsed Opus packet
*/
private parsePacket(buffer: Buffer, mode: string, nonce: Buffer, secretKey: Uint8Array, userId: string) {
private parsePacket(buffer: Buffer, mode: string, nonce: Buffer, secretKey: Uint8Array, userId: string, ssrc: number) {
// Parse key RTP Header fields
const sequence = buffer.readUInt16BE(2);
const timestamp = buffer.readUInt32BE(4);

let packet: Buffer = this.decrypt(buffer, mode, nonce, secretKey);
if (!packet) throw new Error('Failed to parse packet');

Expand All @@ -154,7 +160,26 @@ export class VoiceReceiver {
if (daveSession) packet = daveSession.decrypt(packet, userId)!;
}

return packet;
// Extend packet with RTP header information
return VoiceReceiver.addPacketHeaders(packet, sequence, timestamp, ssrc);
}

/**
* Extends the Buffer for Opus audio data with RTP Header information
*
* @param buffer the opus packet data to extend
* @param sequence the sequence number of the packet
* @param timestamp see definition in
* @param ssrc x
* @returns the input buffer, with RTP header information added
*/
private static addPacketHeaders(buffer: Buffer, sequence: number, timestamp: number, ssrc: number): AudioPacket {
Object.defineProperties(buffer, {
sequence: { value: sequence, writable: false, enumerable: false, configurable: false },
timestamp: { value: timestamp, writable: false, enumerable: false, configurable: false },
ssrc: { value: ssrc, writable: false, enumerable: false, configurable: false },
});
return buffer as AudioPacket;
}

/**
Expand All @@ -164,7 +189,7 @@ export class VoiceReceiver {
* @internal
*/
public onUdpMessage(msg: Buffer) {
if (msg.length <= 8) return;
if (msg.length <= 12) return;
const ssrc = msg.readUInt32BE(8);

const userData = this.ssrcMap.get(ssrc);
Expand All @@ -183,7 +208,8 @@ export class VoiceReceiver {
this.connectionData.nonceBuffer,
this.connectionData.secretKey,
userData.userId,
);
ssrc
);
if (packet) stream.push(packet);
} catch (error) {
stream.destroy(error as Error);
Expand Down
Loading