diff --git a/packages/voice/__mocks__/rtp.ts b/packages/voice/__mocks__/rtp.ts index 805477694aa9..8bd34a1a25c7 100644 --- a/packages/voice/__mocks__/rtp.ts +++ b/packages/voice/__mocks__/rtp.ts @@ -1,8 +1,8 @@ import { Buffer } from 'node:buffer'; // The following constants are silence packets collected from various platforms because Discord did not previously send header extensions -// The header extension (extra data in decrypted vs opusFrame) can be detected in the position of {encrypted.subarray(12,14)} if it is equal to 0xbe,0xde -// The header extension length will then follow as an integer and can be removed from the decrypted data (see ../src/receive/VoiceReceiver.ts:parsePacket) +// The header extension (extra data in decrypted vs opusFrame) is indicated by the X bit (4th bit of byte 0) in the RTP header +// The header extension length follows the CSRC identifiers and can be removed from the decrypted data (see ../src/receive/VoiceReceiver.ts:parsePacket) export const RTP_PACKET_DESKTOP = { ssrc: 341_124, @@ -48,6 +48,7 @@ export const XCHACHA20_SAMPLE = { 133, 174, 108, 144, 251, 110, ]), + // First 8 bytes are Header Extension payload decrypted: Buffer.from([ 0x32, 0x64, 0xe6, 0x62, 0x10, 0xe3, 0x90, 0x02, 0x78, 0x07, 0xd6, 0x2f, 0x52, 0x23, 0x20, 0x9a, 0xab, 0x2c, 0xcc, 0x1c, 0x88, 0x8e, 0xcb, 0xd9, 0x4d, 0xe5, 0x33, 0x7a, 0x4b, 0x2b, 0xed, 0xa7, 0xaf, 0x5f, 0x8d, 0xb2, 0x59, 0x99, diff --git a/packages/voice/__tests__/AudioReceiveStream.test.ts b/packages/voice/__tests__/AudioReceiveStream.test.ts index 6e650f2b437a..ec412ae0da2d 100644 --- a/packages/voice/__tests__/AudioReceiveStream.test.ts +++ b/packages/voice/__tests__/AudioReceiveStream.test.ts @@ -4,14 +4,15 @@ import { describe, test, expect } from 'vitest'; import { SILENCE_FRAME } from '../src/audio/AudioPlayer'; import { AudioReceiveStream, EndBehaviorType } from '../src/receive/AudioReceiveStream'; -const DUMMY_BUFFER = Buffer.allocUnsafe(16); +const DUMMY_PACKET = { payload: Buffer.allocUnsafe(16), sequence: 0, timestamp: 0, ssrc: 0 }; +const SILENCE_PACKET = { payload: SILENCE_FRAME, sequence: 0, timestamp: 0, ssrc: 0 }; async function wait(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } async function stepSilence(stream: AudioReceiveStream, increment: number) { - stream.push(SILENCE_FRAME); + stream.push(SILENCE_PACKET); await wait(increment); expect(stream.readable).toEqual(true); } @@ -19,10 +20,10 @@ async function stepSilence(stream: AudioReceiveStream, increment: number) { describe('AudioReceiveStream', () => { test('Manual end behavior', async () => { const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.Manual } }); - stream.push(DUMMY_BUFFER); + stream.push(DUMMY_PACKET); expect(stream.readable).toEqual(true); await wait(200); - stream.push(DUMMY_BUFFER); + stream.push(DUMMY_PACKET); expect(stream.readable).toEqual(true); stream.push(null); await wait(200); @@ -40,7 +41,7 @@ describe('AudioReceiveStream', () => { await stepSilence(stream, increment); } - stream.push(DUMMY_BUFFER); + stream.push(DUMMY_PACKET); await wait(duration); expect(stream.readableEnded).toEqual(true); @@ -57,7 +58,7 @@ describe('AudioReceiveStream', () => { await stepSilence(stream, increment); } - stream.push(DUMMY_BUFFER); + stream.push(DUMMY_PACKET); for (let index = increment; index < duration; index += increment) { await stepSilence(stream, increment); @@ -75,7 +76,7 @@ describe('AudioReceiveStream', () => { const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterInactivity, duration: 100 } }); stream.resume(); - stream.push(DUMMY_BUFFER); + stream.push(DUMMY_PACKET); expect(stream.readable).toEqual(true); expect(stream.readableEnded).toEqual(false); diff --git a/packages/voice/__tests__/VoiceReceiver.test.ts b/packages/voice/__tests__/VoiceReceiver.test.ts index 65702cd13e7d..69ce9ef9a506 100644 --- a/packages/voice/__tests__/VoiceReceiver.test.ts +++ b/packages/voice/__tests__/VoiceReceiver.test.ts @@ -65,7 +65,52 @@ describe('VoiceReceiver', () => { receiver['onUdpMessage'](RTP_PACKET.packet); await nextTick(); - expect(stream.read()).toEqual(RTP_PACKET.opusFrame); + const packet = stream.read(); + expect(packet.payload).toEqual(RTP_PACKET.opusFrame); + }); + + test.each([ + ['Desktop', RTP_PACKET_DESKTOP, 10_217, 4_157_324_497], + ['Chrome', RTP_PACKET_CHROME, 18_143, 660_155_095], + ['Android', RTP_PACKET_ANDROID, 14_800, 3_763_991_879], + ])('onUdpMessage: RTP header metadata from %s', async (_testName, RTP_PACKET, expectedSeq, expectedTs) => { + receiver['decrypt'] = vitest.fn().mockImplementationOnce(() => RTP_PACKET.decrypted); + + const spy = vitest.spyOn(receiver.ssrcMap, 'get'); + spy.mockImplementation(() => ({ + audioSSRC: RTP_PACKET.ssrc, + userId: '123', + })); + + const stream = receiver.subscribe('123'); + + receiver['onUdpMessage'](RTP_PACKET.packet); + await nextTick(); + const packet = stream.read(); + expect(packet.sequence).toEqual(expectedSeq); + expect(packet.timestamp).toEqual(expectedTs); + expect(packet.ssrc).toEqual(RTP_PACKET.ssrc); + }); + + test('onUdpMessage: AudioPacket has payload and header fields', async () => { + receiver['decrypt'] = vitest.fn().mockImplementationOnce(() => RTP_PACKET_DESKTOP.decrypted); + + const spy = vitest.spyOn(receiver.ssrcMap, 'get'); + spy.mockImplementation(() => ({ + audioSSRC: RTP_PACKET_DESKTOP.ssrc, + userId: '123', + })); + + const stream = receiver.subscribe('123'); + + receiver['onUdpMessage'](RTP_PACKET_DESKTOP.packet); + await nextTick(); + const packet = stream.read(); + expect(Buffer.isBuffer(packet.payload)).toBe(true); + expect(packet.payload).toEqual(RTP_PACKET_DESKTOP.opusFrame); + expect(typeof packet.sequence).toBe('number'); + expect(typeof packet.timestamp).toBe('number'); + expect(typeof packet.ssrc).toBe('number'); }); test('onUdpMessage: <8 bytes packet', () => { @@ -131,15 +176,17 @@ describe('VoiceReceiver', () => { }); }); - describe('decrypt', () => { - test('decrypt: aead_xchacha20_poly1305_rtpsize', () => { + describe('parsePacket', () => { + test('parsePacket: aead_xchacha20_poly1305_rtpsize', () => { const nonceSpace = Buffer.alloc(24); - const decrypted = receiver['decrypt']( + const packet = receiver['parsePacket']( XCHACHA20_SAMPLE.encrypted, 'aead_xchacha20_poly1305_rtpsize', nonceSpace, XCHACHA20_SAMPLE.key, + '123', + 48_921, ); const expectedNonce = Buffer.concat([ @@ -148,17 +195,24 @@ describe('VoiceReceiver', () => { ]); expect(nonceSpace.equals(expectedNonce)).toEqual(true); - expect(decrypted.equals(XCHACHA20_SAMPLE.decrypted)).toEqual(true); + // Extension data (8 bytes) should be stripped from the 61-byte decrypted payload + expect(packet!.payload).toHaveLength(53); + expect(packet!.payload.equals(XCHACHA20_SAMPLE.decrypted.subarray(8))).toEqual(true); + expect(packet!.sequence).toEqual(22_118); + expect(packet!.timestamp).toEqual(3_220_386_864); + expect(packet!.ssrc).toEqual(48_921); }); - test('decrypt: aead_aes256gcm_rtpsize', () => { + test('parsePacket: aead_aes256gcm_rtpsize', () => { const nonceSpace = Buffer.alloc(12); - const decrypted = receiver['decrypt']( + const packet = receiver['parsePacket']( AES256GCM_SAMPLE.encrypted, 'aead_aes256_gcm_rtpsize', nonceSpace, AES256GCM_SAMPLE.key, + '123', + 50_615, ); const expectedNonce = Buffer.concat([ @@ -167,7 +221,11 @@ describe('VoiceReceiver', () => { ]); expect(nonceSpace.equals(expectedNonce)).toEqual(true); - expect(decrypted.equals(AES256GCM_SAMPLE.decrypted)).toEqual(true); + // No extension (X=0), so decrypted payload is the opus frame directly + expect(packet!.payload.equals(AES256GCM_SAMPLE.decrypted)).toEqual(true); + expect(packet!.sequence).toEqual(41_884); + expect(packet!.timestamp).toEqual(2_668_332_016); + expect(packet!.ssrc).toEqual(50_615); }); }); }); diff --git a/packages/voice/src/receive/AudioReceiveStream.ts b/packages/voice/src/receive/AudioReceiveStream.ts index 394b5f9707af..aa3ccc1b0100 100644 --- a/packages/voice/src/receive/AudioReceiveStream.ts +++ b/packages/voice/src/receive/AudioReceiveStream.ts @@ -36,6 +36,48 @@ export interface AudioReceiveStreamOptions extends ReadableOptions { end: EndBehavior; } +/** + * An audio packet containing encoded Opus payload data and key RTP Header metadata. + */ +export class AudioPacket { + /** + * Encoded Opus payload data. + */ + public readonly payload: Buffer; + + /** + * RTP sequence number of this packet (16-bit, wraps at 65535). + */ + public readonly sequence: number; + + /** + * RTP synchronization source identifier for this packet (32-bit). + * A change in SSRC indicates a new RTP stream, so any associated + * decoder should be reset. + */ + public readonly ssrc: number; + + /** + * RTP timestamp of this packet (32-bit, wraps at 2^32 - 1, 48kHz clock). + */ + public readonly timestamp: number; + + /** + * Construct a new AudioPacket. + * + * @param payload - Opus payload + * @param sequence - RTP Sequence Number + * @param timestamp - RTP Timestamp + * @param ssrc - RTP Synchronization Source Identifier + */ + public constructor(payload: Buffer, sequence: number, timestamp: number, ssrc: number) { + this.payload = payload; + this.sequence = sequence; + this.timestamp = timestamp; + this.ssrc = ssrc; + } +} + export function createDefaultAudioReceiveStreamOptions(): AudioReceiveStreamOptions { return { end: { @@ -67,22 +109,22 @@ export class AudioReceiveStream extends Readable { this.end = end; } - public override push(buffer: Buffer | null) { + public override push(packet: AudioPacket | null) { if ( - buffer && + packet && (this.end.behavior === EndBehaviorType.AfterInactivity || (this.end.behavior === EndBehaviorType.AfterSilence && - (buffer.compare(SILENCE_FRAME) !== 0 || this.endTimeout === undefined))) + (packet.payload.compare(SILENCE_FRAME) !== 0 || this.endTimeout === undefined))) ) { this.renewEndTimeout(this.end); } - if (buffer === null) { + if (packet === null) { // null marks EOF for stream process.nextTick(() => this.destroy()); } - return super.push(buffer); + return super.push(packet); } private renewEndTimeout(end: EndBehavior & { duration: number }) { diff --git a/packages/voice/src/receive/VoiceReceiver.ts b/packages/voice/src/receive/VoiceReceiver.ts index 90e4015cb9b1..aedb43e54103 100644 --- a/packages/voice/src/receive/VoiceReceiver.ts +++ b/packages/voice/src/receive/VoiceReceiver.ts @@ -10,12 +10,12 @@ import { methods } from '../util/Secretbox'; import { AudioReceiveStream, createDefaultAudioReceiveStreamOptions, + AudioPacket, type AudioReceiveStreamOptions, } from './AudioReceiveStream'; import { SSRCMap } from './SSRCMap'; import { SpeakingMap } from './SpeakingMap'; -const HEADER_EXTENSION_BYTE = Buffer.from([0xbe, 0xde]); const UNPADDED_NONCE_LENGTH = 4; const AUTH_TAG_LENGTH = 16; @@ -78,18 +78,24 @@ export class VoiceReceiver { } } - private decrypt(buffer: Buffer, mode: string, nonce: Buffer, secretKey: Uint8Array) { + /** + * Decrypt RTP packet payload + * + * @param buffer - RTP packet buffer + * @param mode - cipher mode + * @param nonce - encryption nonce + * @param secretKey - encryption key + * @param headerSize - size of the unencrypted RTP header (fixed header + CSRC + extension header) + * @returns decrypted packet payload + */ + private decrypt(buffer: Buffer, mode: string, nonce: Buffer, secretKey: Uint8Array, headerSize: number) { // Copy the last 4 bytes of unpadded nonce to the padding of (12 - 4) or (24 - 4) bytes buffer.copy(nonce, 0, buffer.length - UNPADDED_NONCE_LENGTH); - let headerSize = 12; - const first = buffer.readUint8(); - if ((first >> 4) & 0x01) headerSize += 4; - - // The unencrypted RTP header contains 12 bytes, HEADER_EXTENSION and the extension size + // The unencrypted RTP header is used as AAD (authenticated but not encrypted) const header = buffer.subarray(0, headerSize); - // Encrypted contains the extension, if any, the opus packet, and the auth tag + // Encrypted contains the extension data, if any, the opus packet, and the auth tag const encrypted = buffer.subarray(headerSize, buffer.length - AUTH_TAG_LENGTH - UNPADDED_NONCE_LENGTH); const authTag = buffer.subarray( buffer.length - AUTH_TAG_LENGTH - UNPADDED_NONCE_LENGTH, @@ -126,35 +132,61 @@ export class VoiceReceiver { /** * Parses an audio packet, decrypting it to yield an Opus packet. * - * @param buffer - The buffer to parse + * @param rtp - The incoming RTP packet buffer to be parsed * @param mode - The encryption mode * @param nonce - The nonce buffer used by the connection for encryption * @param secretKey - The secret key used by the connection for encryption * @param userId - The user id that sent the packet - * @returns The parsed Opus packet + * @param ssrc - already-parsed SSRC (Synchronization Source Identifier) from the RTP Header + * @returns Decrypted Opus payload and RTP header information, or null if DAVE decrypt failed in a way that should be ignored */ - private parsePacket(buffer: Buffer, mode: string, nonce: Buffer, secretKey: Uint8Array, userId: string) { - let packet: Buffer = this.decrypt(buffer, mode, nonce, secretKey); - if (!packet) throw new Error('Failed to parse packet'); - - // Strip decrypted RTP Header Extension if present - // The header is only indicated in the original data, so compare with buffer first - if (buffer.subarray(12, 14).compare(HEADER_EXTENSION_BYTE) === 0) { - const headerExtensionLength = buffer.subarray(14).readUInt16BE(); - packet = packet.subarray(4 * headerExtensionLength); + private parsePacket( + rtp: Buffer, + mode: string, + nonce: Buffer, + secretKey: Uint8Array, + userId: string, + ssrc: number, + ): AudioPacket | null { + // Parse key RTP Header fields + const first = rtp.readUint8(); + const hasHeaderExtension = Boolean((first >> 4) & 0x01); // X field + const cc = first & 0x0f; // CSRC Count field + const sequence = rtp.readUInt16BE(2); + const timestamp = rtp.readUInt32BE(4); + + // Compute unencrypted header size: fixed header + CSRC Identifiers + extension header if present + let headerSize = 12 + 4 * cc; + const extensionHeaderOffset = headerSize; // where the extension header starts, if present + if (hasHeaderExtension) headerSize += 4; // extension header (profile ID + length) + + // Decrypt the RTP Payload + let payload: Buffer = this.decrypt(rtp, mode, nonce, secretKey, headerSize); + if (!payload) throw new Error('Failed to parse packet'); + + // Skip the decrypted RTP Header Extension data if present + if (hasHeaderExtension) { + // Extension Header Length field + const headerExtensionLength = rtp.readUInt16BE(extensionHeaderOffset + 2); + payload = payload.subarray(4 * headerExtensionLength); } - // Decrypt packet if in a DAVE session. + // Decrypt payload if in a DAVE session. if ( this.voiceConnection.state.status === VoiceConnectionStatus.Ready && (this.voiceConnection.state.networking.state.code === NetworkingStatusCode.Ready || this.voiceConnection.state.networking.state.code === NetworkingStatusCode.Resuming) ) { const daveSession = this.voiceConnection.state.networking.state.dave; - if (daveSession) packet = daveSession.decrypt(packet, userId)!; + if (daveSession) { + payload = daveSession.decrypt(payload, userId)!; + + if (!payload) return null; // decryption failed but should be ignored + } } - return packet; + // Construct AudioPacket with Opus payload and RTP header information + return new AudioPacket(payload, sequence, timestamp, ssrc); } /** @@ -164,7 +196,7 @@ export class VoiceReceiver { * @internal */ public onUdpMessage(msg: Buffer) { - if (msg.length <= 8) return; + if (msg.length <= 12) return; const ssrc = msg.readUInt32BE(8); const userData = this.ssrcMap.get(ssrc); @@ -183,6 +215,7 @@ export class VoiceReceiver { this.connectionData.nonceBuffer, this.connectionData.secretKey, userData.userId, + ssrc, ); if (packet) stream.push(packet); } catch (error) {