Skip to content

Commit 2983cfa

Browse files
committed
Add AV1 support. Demo saves AV1 to a .obu file
1 parent 1f9db8a commit 2983cfa

3 files changed

Lines changed: 331 additions & 2 deletions

File tree

examples/demo.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
// Used to connect to Wowza Demo URL but they have taken it away, and the replacement URL on their web site does not work.
1515

16-
const { RTSPClient, H264Transport, H265Transport, AACTransport } = require("../dist");
16+
const { RTSPClient, H264Transport, H265Transport, AV1Transport, AACTransport } = require("../dist");
1717
const fs = require("fs");
1818
const { exit } = require("process");
1919
const { program } = require("commander");
@@ -22,7 +22,7 @@ program.name("demo");
2222
program.description("Yellowstone RTSP Client Test Software");
2323
program.option('-u, --username <value>', 'Optional RTSP Username');
2424
program.option('-p, --password <value>', 'Optional RTSP Password');
25-
program.option('-o, --outfile <value>', 'Optional Output File with no File Extension for captured H264/H265/AAC');
25+
program.option('-o, --outfile <value>', 'Optional Output File with no File Extension for captured H264/H265/AV1/AAC');
2626

2727
program.argument('<rtsp url eg rtsp://1.2.3.4/stream1>');
2828

@@ -74,6 +74,12 @@ client.connect(url, { connection: "tcp", secure: false })
7474
// This class subscribes to the client 'data' event, looking for the video payload
7575
const h265 = new H265Transport(client, videoFile, details);
7676
}
77+
if (details.codec == "AV1") {
78+
const videoFile = fs.createWriteStream(filename + '.obu');
79+
// Step 4: Create AV1Transport passing in the client, file, and details
80+
// This class subscribes to the client 'data' event, looking for the video payload
81+
const av1 = new AV1Transport(client, videoFile, details);
82+
}
7783
if (details.codec == "AAC") {
7884
const audioFile = fs.createWriteStream(filename + '.aac');
7985
// Add AAC Transport

lib/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import H264Transport from "./transports/H264Transport";
22
import H265Transport from "./transports/H265Transport";
3+
import AV1Transport from "./transports/AV1Transport";
34
import AACTransport from "./transports/AACTransport";
45
import ONVIFMetadataTransport from "./transports/ONVIFMetadataTransport";
56
import ONVIFClient from "./ONVIFClient";
@@ -9,6 +10,7 @@ import {RTPPacket, RTCPPacket} from "./util";
910
export {
1011
H264Transport,
1112
H265Transport,
13+
AV1Transport,
1214
AACTransport,
1315
ONVIFMetadataTransport,
1416
ONVIFClient,

lib/transports/AV1Transport.ts

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
// Handle AV1 Video
2+
// Process SDP and RTP packets
3+
// De-packetize RTP packets to re-create AV1 OBUs
4+
// Write AV1 OBUs to a .obu file which can be played with "ffplay"
5+
//
6+
// By Roger Hardiman, May 2025
7+
8+
import RTSPClient from "../RTSPClient";
9+
import { RTPPacket } from "../util";
10+
11+
import * as transform from "sdp-transform";
12+
import { Writable } from "stream";
13+
14+
interface Details {
15+
codec: string
16+
mediaSource: transform.MediaDescription
17+
rtpChannel: number,
18+
rtcpChannel: number
19+
}
20+
21+
export default class AV1Transport {
22+
client: RTSPClient;
23+
stream: Writable;
24+
25+
rtpPackets: Buffer[] = [];
26+
waitingForSequenceHeader = true; // used when writing .obu file as 'ffplay' does not like it if the first OBUs are not TD then SH
27+
28+
constructor(client: RTSPClient, stream: Writable, details: Details) {
29+
this.client = client;
30+
this.stream = stream;
31+
32+
// process 'fmtp'
33+
this.processConnectionDetails(details);
34+
35+
client.on("data", (channel, data, packet) => {
36+
if (channel == details.rtpChannel) {
37+
this.processRTPPacket(packet);
38+
}
39+
});
40+
41+
}
42+
43+
processConnectionDetails(details: Details): void {
44+
// There is no Sequence Header (the extra_data / parameter set) in the SDP of AV1
45+
// and currently we have no use for profile, level-idx or tier
46+
const fmtp = (details.mediaSource.fmtp)[0];
47+
48+
if (!fmtp) {
49+
return;
50+
}
51+
52+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
53+
const fmtpConfig = transform.parseParams(fmtp.config);
54+
55+
/*
56+
const _profile = fmtpConfig['profile'].toString();
57+
const _level_idx = fmtpConfig['level-idx'].toString();
58+
const _tier = fmtpConfig['tier'].toString();
59+
*/
60+
}
61+
62+
processRTPPacket(packet: RTPPacket): void {
63+
// Accumatate RTP packets
64+
this.rtpPackets.push(packet.payload);
65+
66+
// When Marker is set to 1 pass the group of packets to processRTPFrame()
67+
if (packet.marker == 1) {
68+
this.processRTPFrame(this.rtpPackets);
69+
this.rtpPackets = [];
70+
}
71+
}
72+
73+
processRTPFrame(rtpPackets: Buffer[]): void {
74+
const obus: Buffer[] = []; // the OBUs from the RTSP server, which normally come without their length bytes (leb128)
75+
76+
for (let i = 0; i < rtpPackets.length; i++) {
77+
78+
// The RTP packet can contain more than one OBU element
79+
80+
// Examine the first byte of the RTP data, the Aggregation Header.
81+
// Z = 1 Indicates that the first OBU element in this RTP packet is a contination of the last OBU element from the last packet (ie fragmentation)
82+
// Y = 1 Indicates that the last OBU element in this RTP packet will be fragmented and will continue in the next RTP packet (so next RTP packet will have Z=1)
83+
84+
// W = Number of OBU elements in this RTP Packet, or 0 if the number of OBUs is not given
85+
// If W = 0, all OBU elements are prefixed with a LEB128 length.
86+
// If W > 0, the OBU elements _except the last one_ have a LEB128 length prefix. Last OBU has no LEB128 length prefix. It can be computed from the RTP payload size
87+
88+
// N = 1 Indicates first packet of a Coded Video Sequence
89+
90+
// 0 1 2 3 4 5 6 7
91+
// +-+-+-+-+-+-+-+-+
92+
// |Z|Y| W |N|-|-|-|
93+
// +-+-+-+-+-+-+-+-+
94+
const packet = rtpPackets[i];
95+
96+
let ptr = 0;
97+
const aggregation_header = packet[ptr];
98+
ptr++
99+
const aggregation_header_z_bit = (aggregation_header >> 7) & 0x01;
100+
//const aggregation_header_y_bit = (aggregation_header >> 6) & 0x01;
101+
const aggregation_header_w = (aggregation_header >> 4) & 0x03;
102+
//const aggregation_header_n_bit = (aggregation_header >> 3) & 0x01;
103+
104+
/*
105+
if (aggregation_header_z_bit == 1) {
106+
console.log("AV1 Z Fragmentation");
107+
}
108+
if (aggregation_header_y_bit == 1) {
109+
console.log("AV1 Y Fragmentation");
110+
}
111+
if (aggregation_header_n_bit == 1) {
112+
console.log("AV1 N Bit is set");
113+
}
114+
*/
115+
116+
let obuCount = 0;
117+
118+
// Loop over each OBU
119+
while (ptr < packet.length) {
120+
121+
obuCount++;
122+
123+
// Check if the OBU element will be prefixed with a LEB128 length
124+
let hasLeb128Prefix = false;
125+
if (aggregation_header_w == 0) hasLeb128Prefix = true;
126+
if (aggregation_header_w != 0 && obuCount != aggregation_header_w) hasLeb128Prefix = true;
127+
128+
let obu_element_size = 0;
129+
if (hasLeb128Prefix) {
130+
for (let i = 0; i < 8; i++) { // max 8 bytes
131+
const lebByte = packet[ptr];
132+
ptr++;
133+
134+
obu_element_size = obu_element_size + ((lebByte & 0x7F) << (i * 7));
135+
136+
if ((lebByte & 0x80) == 0) {
137+
// finished
138+
break;
139+
}
140+
}
141+
} else {
142+
// no LEB128. Size is the remaining bytes
143+
obu_element_size = packet.length - ptr;
144+
}
145+
146+
// Extract the OBU
147+
let obu = packet.slice(ptr, ptr + obu_element_size);
148+
ptr = ptr + obu_element_size;
149+
150+
// Check Z bit. If Z = 1 we need to append the new OBU data to the Partial OBU data (fragmented data) from the last RTP packet
151+
if (aggregation_header_z_bit == 1 && obuCount == 1) {
152+
// Pop off the last 'partial' OBU
153+
const lastPartialObu = obus.pop();
154+
if (lastPartialObu == undefined) {
155+
// error. We do not have any partial data to append this new OBU data to so drop the new OBU.
156+
} else {
157+
const combinedObuData = Buffer.concat([
158+
lastPartialObu,
159+
obu
160+
]);
161+
obu = combinedObuData;
162+
}
163+
}
164+
165+
// We have an OBU so store it
166+
// Note if 'Y' is set, and we are processing the last OBU in the RTP packet, the data will be only part of the fragmented data
167+
// but we don't check the Y bit. We rely on the Z bit being set to 1 in the next RTP packet
168+
if (obu.length > 0) {
169+
obus.push(obu);
170+
}
171+
} // Ptr now parsed all OBU elements in this RTP packet
172+
173+
174+
} // end for-each RTP packet in the Frame
175+
176+
177+
// Write out all the OBUs
178+
// When we write to a File, we need to add the Temporal Delimiter (TD)
179+
// and then the SEQUENCE_HEADER (SH)
180+
// and then the other OBUs.
181+
// The OBUs are modified to include a LEB128 size as required in the AV1 File Format Spec Section 5 file format
182+
// The modification is needed as the AV1 RTSP Spec strips out the OBU lengths and replaces them with OBU Prefix lengths
183+
// which come before the OBU instead of inside the OBU.
184+
// There is an Annex B format that keeps the length bytes as prefixes on the OBU (instead of inside them) but I've not implemented that
185+
186+
// Check if this Frame includes a Sequence Header
187+
if (this.waitingForSequenceHeader) {
188+
for (const obu of obus) {
189+
const obuHeader = obu[0];
190+
const obu_type = (obuHeader >> 3) & 0x0F;
191+
if (obu_type == 1) { // Sequence Header
192+
this.waitingForSequenceHeader = false;
193+
break;
194+
}
195+
}
196+
}
197+
198+
if (this.waitingForSequenceHeader) {
199+
// drop this RTP frame
200+
console.log("AV1: Waiting for Sequence Header")
201+
}
202+
else
203+
{
204+
// Write the OBUs
205+
const temporalDelimiter = Buffer.from([0x12, 0x00]);
206+
this.stream.write(temporalDelimiter);
207+
208+
for (const obu of obus) {
209+
210+
// Take a look at the OBU and see what it contains to verify it looks correct
211+
// OBU Header Byte
212+
// --------------------------------------------------------------------------------
213+
// | 7 | 6,5,4,3 | 2 | 1 | 0 |
214+
// |1 bit forbidden|4 bit OBU Type|1 bit hasExtension|1 bit hasSize|1 bit reserved|
215+
// --------------------------------------------------------------------------------
216+
if (obu.length > 0) {
217+
const obuHeader = obu[0];
218+
//const forbidden_bit = (obuHeader >> 7) & 0x01;
219+
const obu_type = (obuHeader >> 3) & 0x0F;
220+
const extension_bit = (obuHeader >> 2) & 0x01;
221+
const size_bit = (obuHeader >> 1) & 0x01;
222+
223+
/*
224+
const obu_name = this.GetOBUName(obu_type);
225+
console.log("Found AV1 OBU:" + obu_name);
226+
227+
if (forbidden_bit == 1) {
228+
console.log("OBU Forbidden Bit Error");
229+
}
230+
231+
if (obu_name == "Reserved") {
232+
console.log("OBU Type Error");
233+
}
234+
*/
235+
236+
if (this.waitingForSequenceHeader) {
237+
if (obu_type == 1) {
238+
this.waitingForSequenceHeader = false;
239+
240+
// Write the First TD
241+
242+
243+
} else {
244+
// we are still waiting so drop this OBU
245+
console.log("AV1 file writing: Dropping OBU while waiting for Sequence Header")
246+
continue;
247+
}
248+
}
249+
250+
251+
// In order to write to a .obu file, we have to ensure there is a LEB128 Size after the OBU Header Byte (and Optional Extension Byte)
252+
// The LEB128 length gets stripped out in RTP packets
253+
if (size_bit == 0) {
254+
let size = 0;
255+
if (extension_bit == 0) size = obu.length - 1; // -1 for the OBU header
256+
if (extension_bit == 1) size = obu.length - 2; // -2 for the OBU header and the Header Extension Byte
257+
258+
// Convert the Size into a LEB128 byte sequence
259+
const leb128_bytes = [];
260+
while (size > 0) {
261+
const lower_7_bits = (size & 0x7F);
262+
if (size <= 127) {
263+
leb128_bytes.push(lower_7_bits); // leave msbit as 0
264+
} else {
265+
leb128_bytes.push(0x80 + lower_7_bits); // set msbit to 1
266+
}
267+
size = (size >> 7);
268+
}
269+
const leb128Buffer = Buffer.from(leb128_bytes);
270+
271+
// Insert the leb128 size into the OBU
272+
const header_and_extention_len = (extension_bit == 0 ? 1 : 2); // length of header PLUS extension
273+
// Insert the LEB128 length
274+
const newObu = Buffer.concat([
275+
obu.slice(0, 0 + header_and_extention_len),
276+
leb128Buffer,
277+
obu.slice(header_and_extention_len,obu.length)
278+
]);
279+
// Set the hasSize flag to '1' in the OBU Header
280+
newObu[0] = newObu[0] | 0x02;
281+
282+
// WRITE DATA
283+
this.stream.write(newObu);
284+
}
285+
else
286+
{
287+
// This OBU came with a LEB128 length. The AV1 RTSP Spec says the RTSP server should strip them out, but this
288+
// handles the case where a RTSP Server leaves them in
289+
// WRITE DATA
290+
this.stream.write(obu);
291+
}
292+
}
293+
} // for-each OBU in OBUs Arrau
294+
}
295+
}
296+
297+
298+
GetOBUName(obu_type: number): string {
299+
switch (obu_type)
300+
{
301+
case 0: return "Reserved";
302+
case 1: return "SEQUENCE_HEADER";
303+
case 2: return "TEMPORAL_DELIMITER";
304+
case 3: return "FRAME_HEADER";
305+
case 4: return "TILE_GROUP";
306+
case 5: return "METADATA";
307+
case 6: return "FRAME";
308+
case 7: return "REDUNDANT_FRAME_HEADER";
309+
case 8: return "TILE_LIST";
310+
case 9: return "Reserved";
311+
case 10: return "Reserved";
312+
case 11: return "Reserved";
313+
case 12: return "Reserved";
314+
case 13: return "Reserved";
315+
case 14: return "Reserved";
316+
case 15: return "PADDING";
317+
}
318+
319+
return "Error getting OBU Type";
320+
}
321+
}

0 commit comments

Comments
 (0)