Skip to content

Commit 41b9caa

Browse files
authored
Merge pull request #1 from freddiev4/claude/explore-encoding-parallelism-9IHwU
Parallelize video frame rendering and extraction
2 parents cfb0728 + ae6a47f commit 41b9caa

File tree

2 files changed

+64
-17
lines changed

2 files changed

+64
-17
lines changed

src/video/decoder.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::{Context, Result};
22
use log::info;
3+
use rayon::prelude::*;
34

45
use crate::config::{self, Yts3Config};
56
use crate::video::dct::DctTables;
@@ -63,21 +64,48 @@ impl VideoDecoder {
6364
let stdout = child.stdout.as_mut().unwrap();
6465
let frame_size = self.width as usize * self.height as usize;
6566
let mut all_data = Vec::new();
66-
let mut frame_buf = vec![0u8; frame_size];
6767
let mut frame_count = 0u64;
6868

69+
// Read frames in batches from ffmpeg (I/O must be sequential) and extract
70+
// bits from each batch in parallel. Batch size matches the rayon thread pool
71+
// so all cores stay busy while we keep memory bounded to `threads * frame_size`.
72+
let batch_size = rayon::current_num_threads();
73+
let mut batch: Vec<Vec<u8>> = Vec::with_capacity(batch_size);
74+
6975
loop {
76+
let mut frame_buf = vec![0u8; frame_size];
7077
match read_exact_or_eof(stdout, &mut frame_buf) {
7178
Ok(true) => {
72-
let frame_data = self.extract_frame(&frame_buf);
73-
all_data.extend_from_slice(&frame_data);
79+
batch.push(frame_buf);
7480
frame_count += 1;
81+
82+
if batch.len() >= batch_size {
83+
let extracted: Vec<Vec<u8>> = batch
84+
.par_iter()
85+
.map(|f| self.extract_frame(f))
86+
.collect();
87+
for frame_data in extracted {
88+
all_data.extend_from_slice(&frame_data);
89+
}
90+
batch.clear();
91+
}
7592
}
7693
Ok(false) => break, // EOF
7794
Err(e) => return Err(e.into()),
7895
}
7996
}
8097

98+
// Process any remaining frames in the last (partial) batch
99+
if !batch.is_empty() {
100+
let extracted: Vec<Vec<u8>> = batch
101+
.par_iter()
102+
.map(|f| self.extract_frame(f))
103+
.collect();
104+
for frame_data in extracted {
105+
all_data.extend_from_slice(&frame_data);
106+
}
107+
}
108+
81109
let status = child.wait().context("ffmpeg decode process failed")?;
82110
if !status.success() {
83111
anyhow::bail!("ffmpeg decode exited with status: {}", status);

src/video/encoder.rs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::process::{Command, Stdio};
33

44
use anyhow::{Context, Result};
55
use log::info;
6+
use rayon::prelude::*;
67

78
use crate::config::{self, Yts3Config};
89
use crate::video::dct::DctTables;
@@ -57,6 +58,10 @@ impl VideoEncoder {
5758
self.fps
5859
);
5960

61+
// Scale FFV1 slice count to available threads for better intra-frame parallelism
62+
// inside ffmpeg. Clamped to 16 (a reasonable FFV1 upper bound).
63+
let ffv1_slices = rayon::current_num_threads().min(16).to_string();
64+
6065
let mut child = Command::new("ffmpeg")
6166
.args([
6267
"-y",
@@ -75,7 +80,7 @@ impl VideoEncoder {
7580
"-level",
7681
"3",
7782
"-slices",
78-
"4",
83+
&ffv1_slices,
7984
"-slicecrc",
8085
"1",
8186
output_path,
@@ -88,19 +93,33 @@ impl VideoEncoder {
8893

8994
let stdin = child.stdin.as_mut().unwrap();
9095

91-
for frame_idx in 0..num_frames {
92-
let data_offset = frame_idx * self.bytes_per_frame;
93-
let data_end = (data_offset + self.bytes_per_frame).min(packet_data.len());
94-
let frame_data = if data_offset < packet_data.len() {
95-
&packet_data[data_offset..data_end]
96-
} else {
97-
&[]
98-
};
99-
100-
let frame_pixels = self.render_frame(frame_data);
101-
stdin
102-
.write_all(&frame_pixels)
103-
.context("failed to write frame data to ffmpeg")?;
96+
// Render frames in parallel batches, then write each batch to ffmpeg in order.
97+
// Batch size matches the rayon thread pool so we keep all cores busy without
98+
// holding more than `threads * frame_size` bytes of rendered pixel data at once.
99+
let batch_size = rayon::current_num_threads();
100+
let mut frame_idx = 0;
101+
while frame_idx < num_frames {
102+
let batch_end = (frame_idx + batch_size).min(num_frames);
103+
let frames: Vec<Vec<u8>> = (frame_idx..batch_end)
104+
.into_par_iter()
105+
.map(|idx| {
106+
let data_offset = idx * self.bytes_per_frame;
107+
let data_end = (data_offset + self.bytes_per_frame).min(packet_data.len());
108+
let frame_data = if data_offset < packet_data.len() {
109+
&packet_data[data_offset..data_end]
110+
} else {
111+
&[]
112+
};
113+
self.render_frame(frame_data)
114+
})
115+
.collect();
116+
117+
for frame_pixels in &frames {
118+
stdin
119+
.write_all(frame_pixels)
120+
.context("failed to write frame data to ffmpeg")?;
121+
}
122+
frame_idx = batch_end;
104123
}
105124

106125
drop(child.stdin.take());

0 commit comments

Comments
 (0)