Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/voice/__mocks__/rtp.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Buffer } from 'node:buffer';

// The following constants are silence packets collected from various platforms because Discord did not previously send header extensions
// The header extension (extra data in decrypted vs opusFrame) can be detected in the position of {encrypted.subarray(12,14)} if it is equal to 0xbe,0xde
// The header extension length will then follow as an integer and can be removed from the decrypted data (see ../src/receive/VoiceReceiver.ts:parsePacket)
// The header extension (extra data in decrypted vs opusFrame) is indicated by the X bit (4th bit of byte 0) in the RTP header
// The header extension length follows the CSRC identifiers and can be removed from the decrypted data (see ../src/receive/VoiceReceiver.ts:parsePacket)

export const RTP_PACKET_DESKTOP = {
ssrc: 341_124,
Expand Down Expand Up @@ -48,6 +48,7 @@ export const XCHACHA20_SAMPLE = {
133, 174, 108, 144, 251, 110,
]),

// First 8 bytes are Header Extension payload
decrypted: Buffer.from([
0x32, 0x64, 0xe6, 0x62, 0x10, 0xe3, 0x90, 0x02, 0x78, 0x07, 0xd6, 0x2f, 0x52, 0x23, 0x20, 0x9a, 0xab, 0x2c, 0xcc,
0x1c, 0x88, 0x8e, 0xcb, 0xd9, 0x4d, 0xe5, 0x33, 0x7a, 0x4b, 0x2b, 0xed, 0xa7, 0xaf, 0x5f, 0x8d, 0xb2, 0x59, 0x99,
Expand Down
15 changes: 8 additions & 7 deletions packages/voice/__tests__/AudioReceiveStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,26 @@ import { describe, test, expect } from 'vitest';
import { SILENCE_FRAME } from '../src/audio/AudioPlayer';
import { AudioReceiveStream, EndBehaviorType } from '../src/receive/AudioReceiveStream';

const DUMMY_BUFFER = Buffer.allocUnsafe(16);
const DUMMY_PACKET = { payload: Buffer.allocUnsafe(16), sequence: 0, timestamp: 0, ssrc: 0 };
const SILENCE_PACKET = { payload: SILENCE_FRAME, sequence: 0, timestamp: 0, ssrc: 0 };

async function wait(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

async function stepSilence(stream: AudioReceiveStream, increment: number) {
stream.push(SILENCE_FRAME);
stream.push(SILENCE_PACKET);
await wait(increment);
expect(stream.readable).toEqual(true);
}

describe('AudioReceiveStream', () => {
test('Manual end behavior', async () => {
const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.Manual } });
stream.push(DUMMY_BUFFER);
stream.push(DUMMY_PACKET);
expect(stream.readable).toEqual(true);
await wait(200);
stream.push(DUMMY_BUFFER);
stream.push(DUMMY_PACKET);
expect(stream.readable).toEqual(true);
stream.push(null);
await wait(200);
Expand All @@ -40,7 +41,7 @@ describe('AudioReceiveStream', () => {
await stepSilence(stream, increment);
}

stream.push(DUMMY_BUFFER);
stream.push(DUMMY_PACKET);

await wait(duration);
expect(stream.readableEnded).toEqual(true);
Expand All @@ -57,7 +58,7 @@ describe('AudioReceiveStream', () => {
await stepSilence(stream, increment);
}

stream.push(DUMMY_BUFFER);
stream.push(DUMMY_PACKET);

for (let index = increment; index < duration; index += increment) {
await stepSilence(stream, increment);
Expand All @@ -75,7 +76,7 @@ describe('AudioReceiveStream', () => {
const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterInactivity, duration: 100 } });
stream.resume();

stream.push(DUMMY_BUFFER);
stream.push(DUMMY_PACKET);

expect(stream.readable).toEqual(true);
expect(stream.readableEnded).toEqual(false);
Expand Down
74 changes: 66 additions & 8 deletions packages/voice/__tests__/VoiceReceiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,52 @@ describe('VoiceReceiver', () => {

receiver['onUdpMessage'](RTP_PACKET.packet);
await nextTick();
expect(stream.read()).toEqual(RTP_PACKET.opusFrame);
const packet = stream.read();
expect(packet.payload).toEqual(RTP_PACKET.opusFrame);
});

test.each([
['Desktop', RTP_PACKET_DESKTOP, 10_217, 4_157_324_497],
['Chrome', RTP_PACKET_CHROME, 18_143, 660_155_095],
['Android', RTP_PACKET_ANDROID, 14_800, 3_763_991_879],
])('onUdpMessage: RTP header metadata from %s', async (_testName, RTP_PACKET, expectedSeq, expectedTs) => {
receiver['decrypt'] = vitest.fn().mockImplementationOnce(() => RTP_PACKET.decrypted);

const spy = vitest.spyOn(receiver.ssrcMap, 'get');
spy.mockImplementation(() => ({
audioSSRC: RTP_PACKET.ssrc,
userId: '123',
}));

const stream = receiver.subscribe('123');

receiver['onUdpMessage'](RTP_PACKET.packet);
await nextTick();
const packet = stream.read();
expect(packet.sequence).toEqual(expectedSeq);
expect(packet.timestamp).toEqual(expectedTs);
expect(packet.ssrc).toEqual(RTP_PACKET.ssrc);
});

test('onUdpMessage: AudioPacket has payload and header fields', async () => {
receiver['decrypt'] = vitest.fn().mockImplementationOnce(() => RTP_PACKET_DESKTOP.decrypted);

const spy = vitest.spyOn(receiver.ssrcMap, 'get');
spy.mockImplementation(() => ({
audioSSRC: RTP_PACKET_DESKTOP.ssrc,
userId: '123',
}));

const stream = receiver.subscribe('123');

receiver['onUdpMessage'](RTP_PACKET_DESKTOP.packet);
await nextTick();
const packet = stream.read();
expect(Buffer.isBuffer(packet.payload)).toBe(true);
expect(packet.payload).toEqual(RTP_PACKET_DESKTOP.opusFrame);
expect(typeof packet.sequence).toBe('number');
expect(typeof packet.timestamp).toBe('number');
expect(typeof packet.ssrc).toBe('number');
});

test('onUdpMessage: <8 bytes packet', () => {
Expand Down Expand Up @@ -131,15 +176,17 @@ describe('VoiceReceiver', () => {
});
});

describe('decrypt', () => {
test('decrypt: aead_xchacha20_poly1305_rtpsize', () => {
describe('parsePacket', () => {
test('parsePacket: aead_xchacha20_poly1305_rtpsize', () => {
const nonceSpace = Buffer.alloc(24);

const decrypted = receiver['decrypt'](
const packet = receiver['parsePacket'](
XCHACHA20_SAMPLE.encrypted,
'aead_xchacha20_poly1305_rtpsize',
nonceSpace,
XCHACHA20_SAMPLE.key,
'123',
48_921,
);

const expectedNonce = Buffer.concat([
Expand All @@ -148,17 +195,24 @@ describe('VoiceReceiver', () => {
]);

expect(nonceSpace.equals(expectedNonce)).toEqual(true);
expect(decrypted.equals(XCHACHA20_SAMPLE.decrypted)).toEqual(true);
// Extension data (8 bytes) should be stripped from the 61-byte decrypted payload
expect(packet!.payload).toHaveLength(53);
expect(packet!.payload.equals(XCHACHA20_SAMPLE.decrypted.subarray(8))).toEqual(true);
expect(packet!.sequence).toEqual(22_118);
expect(packet!.timestamp).toEqual(3_220_386_864);
expect(packet!.ssrc).toEqual(48_921);
});

test('decrypt: aead_aes256gcm_rtpsize', () => {
test('parsePacket: aead_aes256gcm_rtpsize', () => {
const nonceSpace = Buffer.alloc(12);

const decrypted = receiver['decrypt'](
const packet = receiver['parsePacket'](
AES256GCM_SAMPLE.encrypted,
'aead_aes256_gcm_rtpsize',
nonceSpace,
AES256GCM_SAMPLE.key,
'123',
50_615,
);

const expectedNonce = Buffer.concat([
Expand All @@ -167,7 +221,11 @@ describe('VoiceReceiver', () => {
]);

expect(nonceSpace.equals(expectedNonce)).toEqual(true);
expect(decrypted.equals(AES256GCM_SAMPLE.decrypted)).toEqual(true);
// No extension (X=0), so decrypted payload is the opus frame directly
expect(packet!.payload.equals(AES256GCM_SAMPLE.decrypted)).toEqual(true);
expect(packet!.sequence).toEqual(41_884);
expect(packet!.timestamp).toEqual(2_668_332_016);
expect(packet!.ssrc).toEqual(50_615);
});
});
});
52 changes: 47 additions & 5 deletions packages/voice/src/receive/AudioReceiveStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,48 @@ export interface AudioReceiveStreamOptions extends ReadableOptions {
end: EndBehavior;
}

/**
* An audio packet containing encoded Opus payload data and key RTP Header metadata.
*/
export class AudioPacket {
/**
* Encoded Opus payload data.
*/
public readonly payload: Buffer;

/**
* RTP sequence number of this packet (16-bit, wraps at 65535).
*/
public readonly sequence: number;

/**
* RTP synchronization source identifier for this packet (32-bit).
* A change in SSRC indicates a new RTP stream, so any associated
* decoder should be reset.
*/
public readonly ssrc: number;

/**
* RTP timestamp of this packet (32-bit, wraps at 2^32 - 1, 48kHz clock).
*/
public readonly timestamp: number;

/**
* Construct a new AudioPacket.
*
* @param payload - Opus payload
* @param sequence - RTP Sequence Number
* @param timestamp - RTP Timestamp
* @param ssrc - RTP Synchronization Source Identifier
*/
public constructor(payload: Buffer, sequence: number, timestamp: number, ssrc: number) {
this.payload = payload;
this.sequence = sequence;
this.timestamp = timestamp;
this.ssrc = ssrc;
}
}

export function createDefaultAudioReceiveStreamOptions(): AudioReceiveStreamOptions {
return {
end: {
Expand Down Expand Up @@ -67,22 +109,22 @@ export class AudioReceiveStream extends Readable {
this.end = end;
}

public override push(buffer: Buffer | null) {
public override push(packet: AudioPacket | null) {
if (
buffer &&
packet &&
(this.end.behavior === EndBehaviorType.AfterInactivity ||
(this.end.behavior === EndBehaviorType.AfterSilence &&
(buffer.compare(SILENCE_FRAME) !== 0 || this.endTimeout === undefined)))
(packet.payload.compare(SILENCE_FRAME) !== 0 || this.endTimeout === undefined)))
) {
this.renewEndTimeout(this.end);
}

if (buffer === null) {
if (packet === null) {
// null marks EOF for stream
process.nextTick(() => this.destroy());
}

return super.push(buffer);
return super.push(packet);
}

private renewEndTimeout(end: EndBehavior & { duration: number }) {
Expand Down
Loading