Skip to content

Commit 441101c

Browse files
telecosetemesi254
authored andcommitted
feat(jpeg): add incremental checkpoint mode
1 parent d77ed37 commit 441101c

5 files changed

Lines changed: 205 additions & 21 deletions

File tree

benchmarks/benches/decode_jpeg.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,13 @@ fn decode_jpeg_opts(buf: &[u8], options: DecoderOptions) -> Vec<u8> {
175175
d.decode().unwrap()
176176
}
177177

178+
fn decode_jpeg_incremental_mode(buf: &[u8]) -> Vec<u8> {
179+
let mut d = JpegDecoder::new(ZCursor::new(buf));
180+
d.set_incremental_mode(true);
181+
182+
d.decode().unwrap()
183+
}
184+
178185
fn decode_no_samp_opts(c: &mut Criterion) {
179186
let a = sample_path().join("test-images/jpeg/benchmarks/speed_bench.jpg");
180187

@@ -336,6 +343,66 @@ fn decode_restart_resume(c: &mut Criterion) {
336343
});
337344
}
338345

346+
fn decode_streaming_mode(c: &mut Criterion) {
347+
let data = read(sample_path().join("test-images/jpeg/benchmarks/speed_bench_hv_subsampling.jpg"))
348+
.unwrap();
349+
let mut group = c.benchmark_group("jpeg: Incremental mode checkpoints");
350+
group.throughput(Throughput::Bytes(data.len() as u64));
351+
352+
group.bench_function("one-shot default", |b| {
353+
b.iter(|| black_box(decode_jpeg(data.as_slice())));
354+
});
355+
356+
group.bench_function("one-shot incremental-mode", |b| {
357+
b.iter(|| black_box(decode_jpeg_incremental_mode(data.as_slice())));
358+
});
359+
360+
let partial = data.len() * 80 / 100;
361+
362+
group.bench_function("first retry default", |b| {
363+
b.iter(|| {
364+
let limit = Rc::new(Cell::new(partial));
365+
let cursor = GrowableCursor::new(data.as_slice(), Rc::clone(&limit));
366+
let mut decoder = JpegDecoder::new(cursor);
367+
decoder.decode_headers().expect("headers should fit in 80%");
368+
let mut out = vec![0u8; decoder.output_buffer_size().unwrap()];
369+
370+
match decoder.decode_into(&mut out) {
371+
Ok(()) => panic!("scan should not complete at 80% visibility"),
372+
Err(e) => assert!(e.is_recoverable_eof(), "got: {e:?}"),
373+
}
374+
375+
limit.set(data.len());
376+
decoder
377+
.decode_into(&mut out)
378+
.expect("scan should complete with full data");
379+
black_box(out);
380+
});
381+
});
382+
383+
group.bench_function("first retry incremental-mode", |b| {
384+
b.iter(|| {
385+
let limit = Rc::new(Cell::new(partial));
386+
let cursor = GrowableCursor::new(data.as_slice(), Rc::clone(&limit));
387+
let mut decoder = JpegDecoder::new(cursor);
388+
decoder.set_incremental_mode(true);
389+
decoder.decode_headers().expect("headers should fit in 80%");
390+
let mut out = vec![0u8; decoder.output_buffer_size().unwrap()];
391+
392+
match decoder.decode_into(&mut out) {
393+
Ok(()) => panic!("scan should not complete at 80% visibility"),
394+
Err(e) => assert!(e.is_recoverable_eof(), "got: {e:?}"),
395+
}
396+
397+
limit.set(data.len());
398+
decoder
399+
.decode_into(&mut out)
400+
.expect("scan should complete with full data");
401+
black_box(out);
402+
});
403+
});
404+
}
405+
339406
criterion_group!(name=benches;
340407
config={
341408
let c = Criterion::default();
@@ -345,6 +412,6 @@ criterion_group!(name=benches;
345412
decode_hv_samp,criterion_benchmark_grayscale,
346413
decode_hv_samp_prog,decode_h_samp_prog,decode_no_samp_prog,decode_v_samp_prog,
347414
decode_no_samp_opts,
348-
decode_restart_full,decode_restart_resume);
415+
decode_restart_full,decode_restart_resume,decode_streaming_mode);
349416

350417
criterion_main!(benches);

crates/zune-jpeg/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ kept across retries. If scan decoding returns recoverable EOF,
4040
`decoded_output_bytes()` and `decoded_scanlines()` report the stable prefix of
4141
the output buffer that can be displayed or copied before retrying.
4242

43+
By default, row checkpoints are recorded only after a previous scan decode
44+
attempt, so one-shot decoding keeps the lowest-overhead path. Call
45+
`set_incremental_mode(true)` before the first `decode_into()` attempt when the
46+
caller expects input to arrive incrementally; this records checkpoints during
47+
the first baseline Huffman scan attempt and can reduce replay work on the next
48+
retry.
49+
4350
```Rust
4451
use zune_core::bytestream::ZCursor;
4552
use zune_jpeg::JpegDecoder;
@@ -57,6 +64,7 @@ loop {
5764
}
5865

5966
let mut pixels = vec![0; decoder.output_buffer_size().unwrap()];
67+
decoder.set_incremental_mode(true);
6068

6169
loop {
6270
match decoder.decode_into(&mut pixels) {

crates/zune-jpeg/src/decoder.rs

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,25 @@ pub struct JpegDecoder<T> {
302302
pub(crate) progressive_mcus_buffer: [Vec<i16>; MAX_COMPONENTS],
303303
/// Whether per-row checkpointing is enabled for the current decode.
304304
///
305-
/// Only `true` on a retry `decode_into` call (when `scan_state` was
306-
/// already `Some` on entry). This keeps the one-shot decode path free
307-
/// of per-row overhead while still enabling fine-grained resume on
308-
/// incremental retries.
305+
/// By default this becomes `true` after a previous scan attempt has run,
306+
/// keeping one-shot decode free of per-row overhead. `incremental_mode`
307+
/// enables the same checkpoints on the first scan attempt for streaming
308+
/// callers.
309309
pub(crate) mcu_checkpoints_enabled: bool,
310+
/// Whether row checkpoints should also be recorded on the first scan
311+
/// decode attempt.
312+
///
313+
/// Disabled by default to keep one-shot decode free of checkpoint work;
314+
/// streaming callers can opt in before `decode_into` to avoid replaying
315+
/// from scan start after the first recoverable scan EOF.
316+
incremental_mode: bool,
317+
/// Whether this decoder has already attempted scan decoding.
318+
///
319+
/// `scan_state` becomes `Some` as soon as headers reach SOS, including
320+
/// after an explicit `decode_headers` call. This flag tracks the narrower
321+
/// condition needed for default checkpoint gating: a previous
322+
/// `decode_into` scan attempt actually ran.
323+
scan_decode_attempted: bool,
310324
/// Scratch buffer that header marker parsers fill with the marker body
311325
/// before mutating decoder state.
312326
///
@@ -541,6 +555,8 @@ where
541555
scan_state: None,
542556
pixels_decoded: 0,
543557
mcu_checkpoints_enabled: false,
558+
incremental_mode: false,
559+
scan_decode_attempted: false,
544560
progressive_mcus_buffer: core::array::from_fn(|_| Vec::new()),
545561
marker_body_scratch: Vec::new()
546562
}
@@ -632,6 +648,33 @@ where
632648
Some(self.pixels_decoded.min(self.output_buffer_size()?))
633649
}
634650

651+
/// Return whether incremental mode is enabled.
652+
///
653+
/// Incremental mode records per-row checkpoints during the first scan
654+
/// decode attempt, allowing a later retry after recoverable EOF to resume
655+
/// from the latest stable row instead of replaying from scan start.
656+
///
657+
/// It is disabled by default so one-shot decoding keeps the lowest
658+
/// overhead path.
659+
#[must_use]
660+
pub const fn incremental_mode(&self) -> bool {
661+
self.incremental_mode
662+
}
663+
664+
/// Enable or disable incremental mode.
665+
///
666+
/// Call this before the first `decode_into` scan attempt when the caller
667+
/// expects input to arrive incrementally. In this mode baseline Huffman
668+
/// single-SOS scans save row checkpoints on the first attempt, trading a
669+
/// small amount of checkpoint work for less replay on the next retry.
670+
///
671+
/// The default is `false`, which preserves the zero-overhead one-shot
672+
/// path and only enables row checkpoints after a previous scan decode
673+
/// attempt has run.
674+
pub fn set_incremental_mode(&mut self, enabled: bool) {
675+
self.incremental_mode = enabled;
676+
}
677+
635678
/// Return the number of output scanlines known to be stable after the
636679
/// most recent `decode_into` attempt.
637680
///
@@ -1238,6 +1281,12 @@ where
12381281
/// from hard failures: `Err(e)` where `e.is_recoverable_eof()` means feed
12391282
/// more input and retry, while any other `Err` is non-recoverable.
12401283
///
1284+
/// By default, row checkpoints are enabled after a previous scan decode
1285+
/// attempt, so the first one-shot decode avoids checkpoint overhead. Call
1286+
/// [`set_incremental_mode`](Self::set_incremental_mode) before the first
1287+
/// scan attempt to record row checkpoints immediately when input is
1288+
/// expected to arrive incrementally.
1289+
///
12411290
/// On success the decoder keeps scan-start replay state, so a later
12421291
/// `decode_into` call is well-defined and produces bit-identical pixels.
12431292
/// Replay re-runs entropy decoding from the first SOS.
@@ -1283,15 +1332,6 @@ where
12831332
pixels_written: usize,
12841333
dc_predictions: [(i32, i32); MAX_COMPONENTS]
12851334
}
1286-
// Enable per-row checkpointing only on retry calls (when scan_state
1287-
// already existed before this decode_into invocation). One-shot
1288-
// decoding skips checkpoint overhead entirely.
1289-
let retrying_scan = self.scan_state.is_some();
1290-
self.mcu_checkpoints_enabled = retrying_scan;
1291-
if !retrying_scan {
1292-
self.pixels_decoded = 0;
1293-
}
1294-
12951335
let scan_plan = self.scan_state.as_deref().map(|state| ScanPlan {
12961336
scan_start_position: state.scan_start_position,
12971337
outer_append_snapshot: state.append_snapshot,
@@ -1388,6 +1428,17 @@ where
13881428
let out_len = core::cmp::min(out.len(), expected_size);
13891429
let out = &mut out[0..out_len];
13901430

1431+
// By default, enable per-row checkpointing only after a previous
1432+
// scan decode attempt has run. Incremental mode opts into the same
1433+
// checkpoints on the first scan attempt so a streaming caller avoids
1434+
// one scan-start replay.
1435+
let previous_scan_attempt = self.scan_decode_attempted;
1436+
self.mcu_checkpoints_enabled = previous_scan_attempt || self.incremental_mode;
1437+
if !previous_scan_attempt {
1438+
self.pixels_decoded = 0;
1439+
}
1440+
self.scan_decode_attempted = true;
1441+
13911442
let result: Result<(), DecodeErrors>;
13921443
if self.is_arithmetic {
13931444
#[cfg(feature = "arith")]

crates/zune-jpeg/src/mcu.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,9 @@ impl<T: ZByteReaderTrait> JpegDecoder<T> {
257257
// Per-row checkpoint: save bitstream state at the start of
258258
// each MCU row so we can resume from here (rather than
259259
// replaying from scan start) if ExhaustedData fires mid-row.
260-
// Only active on retry calls to keep one-shot decode free of
261-
// any overhead (this code is outside the hot MCU inner loop).
260+
// Active on retry calls, or on the first attempt when the
261+
// caller explicitly opted into incremental mode. Default
262+
// one-shot decode keeps this disabled.
262263
//
263264
// Only for single-SOS baseline (all_components_in_first_scan):
264265
// multi-SOS scans can't safely resume mid-scan because later

crates/zune-jpeg/tests/incremental.rs

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,10 +1256,67 @@ fn entropy_start(data: &[u8]) -> usize {
12561256
sos_pos + 2 + sos_len
12571257
}
12581258

1259+
fn first_retry_seek_after_scan_eof(data: &[u8], incremental_mode: bool) -> usize {
1260+
let expected = decode_oneshot(data);
1261+
let entropy_start = entropy_start(data);
1262+
let cutoff = entropy_start + (data.len() - entropy_start) * 60 / 100;
1263+
1264+
let limit = Rc::new(Cell::new(cutoff));
1265+
let seek_log = Rc::new(RefCell::new(Vec::new()));
1266+
let cursor = GrowableCursor::with_seek_log(data, Rc::clone(&limit), Rc::clone(&seek_log));
1267+
let mut decoder = JpegDecoder::new(cursor);
1268+
decoder.set_incremental_mode(incremental_mode);
1269+
1270+
decoder
1271+
.decode_headers()
1272+
.expect("headers should be fully visible at cutoff");
1273+
assert_eq!(decoder.incremental_mode(), incremental_mode);
1274+
let mut out = vec![0u8; decoder.output_buffer_size().unwrap()];
1275+
1276+
let err = decoder
1277+
.decode_into(&mut out)
1278+
.expect_err("truncated scan should give recoverable EOF");
1279+
assert!(
1280+
err.is_recoverable_eof(),
1281+
"expected recoverable EOF, got {err:?}"
1282+
);
1283+
1284+
seek_log.borrow_mut().clear();
1285+
limit.set(data.len());
1286+
decoder
1287+
.decode_into(&mut out)
1288+
.expect("full data should allow decode to complete");
1289+
assert_pixels_match(&out, &expected, "first_retry_seek", data.len());
1290+
1291+
let seeks = seek_log.borrow();
1292+
*seeks
1293+
.first()
1294+
.expect("retry must seek to scan start or a checkpoint")
1295+
}
1296+
1297+
#[test]
1298+
fn incremental_mode_records_checkpoint_on_first_scan_attempt() {
1299+
let data = include_bytes!("../../../test-images/jpeg/sampling_factors.jpg");
1300+
let entropy_start = entropy_start(data);
1301+
1302+
let default_seek = first_retry_seek_after_scan_eof(data, false);
1303+
assert_eq!(
1304+
default_seek, entropy_start,
1305+
"default mode should replay from scan start on the first retry"
1306+
);
1307+
1308+
let incremental_seek = first_retry_seek_after_scan_eof(data, true);
1309+
assert!(
1310+
incremental_seek > entropy_start,
1311+
"incremental mode should resume from an entropy-data row checkpoint; \
1312+
entropy_start={entropy_start}, seek={incremental_seek}"
1313+
);
1314+
}
1315+
12591316
/// Per-row checkpoint: truncating a non-RST image mid-scan and retrying
12601317
/// must eventually resume from a row checkpoint rather than replaying
1261-
/// from scan start. Per-row checkpointing is only enabled on retry calls
1262-
/// (when scan_state already exists), so the sequence is:
1318+
/// from scan start. In default mode, per-row checkpointing is only enabled
1319+
/// after a previous scan decode attempt has run, so the sequence is:
12631320
/// 1. First call: partial data → ExhaustedData (no checkpoints saved)
12641321
/// 2. Second call (retry, checkpoints now enabled): still partial → ExhaustedData
12651322
/// (per-row checkpoints ARE saved this time)
@@ -1309,9 +1366,9 @@ fn per_row_checkpoint_avoids_full_scan_replay() {
13091366
"expected a stable partial prefix, got {first_scanlines} scanlines"
13101367
);
13111368

1312-
// Second attempt — still truncated. Now per-row checkpoints are enabled
1313-
// (scan_state exists from the first call). This replays from scan start
1314-
// and saves per-row checkpoints as it decodes.
1369+
// Second attempt — still truncated. Now a previous scan decode attempt
1370+
// has run, so this replays from scan start and saves per-row checkpoints
1371+
// as it decodes.
13151372
let err = decoder
13161373
.decode_into(&mut out)
13171374
.expect_err("still truncated, should give recoverable EOF again");

0 commit comments

Comments
 (0)