From fa62deeaa3f96f0077f9aa76611746dc3343f137 Mon Sep 17 00:00:00 2001 From: djugei Date: Sun, 13 Jul 2025 09:36:19 +0200 Subject: [PATCH 1/2] fix logic error: only consume logically progresses, not fill_buf --- src/iter.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/iter.rs b/src/iter.rs index 8861db52..8c82379c 100644 --- a/src/iter.rs +++ b/src/iter.rs @@ -265,15 +265,12 @@ impl tokio::io::Async { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - let result = Pin::new(&mut this.it).poll_fill_buf(cx); - if let Poll::Ready(Ok(buf)) = &result { - this.progress.inc(buf.len() as u64); - } - result + Pin::new(&mut this.it).poll_fill_buf(cx) } fn consume(mut self: Pin<&mut Self>, amt: usize) { Pin::new(&mut self.it).consume(amt); + self.progress.inc(amt.try_into().unwrap()); } } From 382e1915e1972c46b2e6458b0a64d36bd640bc9f Mon Sep 17 00:00:00 2001 From: djugei Date: Thu, 2 May 2024 11:28:36 +0200 Subject: [PATCH 2/2] Introduce de-jittering for seeks. On the first seek it gets enabled, displaying the progress as the max of the last 10 updates. If there ever are more than 5 consecutive reads and writes without seek it gets disabled again, keeping the performance impact low. --- src/iter.rs | 209 ++++++++++++++++++++++++++++++++++++++++---- src/progress_bar.rs | 7 +- src/rayon.rs | 9 +- 3 files changed, 204 insertions(+), 21 deletions(-) diff --git a/src/iter.rs b/src/iter.rs index 8c82379c..04cd528c 100644 --- a/src/iter.rs +++ b/src/iter.rs @@ -62,6 +62,7 @@ where pub struct ProgressBarIter { pub(crate) it: T, pub progress: ProgressBar, + pub(crate) dejitter: MaxSeekHeuristic, } impl ProgressBarIter { @@ -155,25 +156,37 @@ impl FusedIterator for ProgressBarIter {} impl io::Read for ProgressBarIter { fn read(&mut self, buf: &mut [u8]) -> io::Result { let inc = self.it.read(buf)?; - self.progress.inc(inc as u64); + self.progress.set_position( + self.dejitter + .update_seq(self.progress.position(), inc as u64), + ); Ok(inc) } fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result { let inc = self.it.read_vectored(bufs)?; - self.progress.inc(inc as u64); + self.progress.set_position( + self.dejitter + .update_seq(self.progress.position(), inc as u64), + ); Ok(inc) } fn read_to_string(&mut self, buf: &mut String) -> io::Result { let inc = self.it.read_to_string(buf)?; - self.progress.inc(inc as u64); + self.progress.set_position( + self.dejitter + .update_seq(self.progress.position(), inc as u64), + ); Ok(inc) } fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { self.it.read_exact(buf)?; - self.progress.inc(buf.len() as u64); + self.progress.set_position( + self.dejitter + .update_seq(self.progress.position(), buf.len() as u64), + ); Ok(()) } } @@ -185,15 +198,24 @@ impl io::BufRead for ProgressBarIter { fn consume(&mut self, amt: usize) { self.it.consume(amt); - self.progress.inc(amt as u64); + self.progress.set_position( + self.dejitter + .update_seq(self.progress.position(), amt.try_into().unwrap()), + ); } } impl io::Seek for ProgressBarIter { fn seek(&mut self, f: io::SeekFrom) -> io::Result { self.it.seek(f).map(|pos| { - self.progress.set_position(pos); - pos + // this kind of seek is used to find the current position, but does not alter it + // generally equivalent to stream_position() + if let io::SeekFrom::Current(0) = f { + pos + } else { + self.progress.set_position(self.dejitter.update_seek(pos)); + pos + } }) } // Pass this through to preserve optimizations that the inner I/O object may use here @@ -203,6 +225,123 @@ impl io::Seek for ProgressBarIter { } } +/// Calculates a more stable visual position from jittery seeks to show to the user. +/// +/// It does so by holding the maximum position encountered out of the last HISTORY read/write positions. +/// As an optimization it deallocates the history when only sequential operations are performed RESET times in a row. +#[derive(Debug, Default)] +pub(crate) struct MaxSeekHeuristic { + buf: Option<(Box>, u8)>, +} + +impl MaxSeekHeuristic { + fn update_seq(&mut self, prev_pos: u64, delta: u64) -> u64 { + let new_pos = prev_pos + delta; + if let Some((buf, seq)) = &mut self.buf { + *seq += 1; + if *seq >= RESET { + self.buf = None; + return new_pos; + } + + buf.update(new_pos); + buf.max() + } else { + new_pos + } + } + + fn update_seek(&mut self, newpos: u64) -> u64 { + let (b, seq) = self + .buf + .get_or_insert_with(|| (Box::new(MaxRingBuf::::default()), 0)); + *seq = 0; + b.update(newpos); + b.max() + } +} + +/// Ring buffer that remembers the maximum contained value. +/// +/// can be used to quickly calculate the maximum value of a history of data points. +#[derive(Debug)] +struct MaxRingBuf { + history: [u64; HISTORY], + // invariant_h: always a valid index into history + head: u8, + // invariant_m: always a valid index into history + max_pos: u8, +} + +impl MaxRingBuf { + /// Adds a value to the history. + /// Updates internal bookkeeping to remember the maximum value. + /// + /// # Performance: + /// amortized O(1): + /// each regular update is O(1). + /// Only updates that overwrite the position the maximum was stored in with a smaller number do a seek of the buffer, + /// searching for the new maximum. + /// This only happens on average each 1/HISTORY and has a cost of HISTORY, + /// therefore amortizing to O(1). + /// + /// In case there is some linear increase with jitter, + /// as expected in this specific use-case, + /// as long as there is one bigger update each HISTORY updates the scan is never triggered at all. + /// + /// Worst case would be linearly decreasing values, which is still O(1). + fn update(&mut self, new: u64) { + // exploit invariant_h to eliminate bounds checks & panic code path + let head = usize::from(self.head) % self.history.len(); + // exploit invariant_m to eliminate bounds checks & panic code path + let max_pos = usize::from(self.max_pos) % self.history.len(); + + // save max now in case it gets overwritten in the next line + let prev_max = self.history[max_pos]; + self.history[head] = new; + + if new > prev_max { + // This is now the new maximum + self.max_pos = self.head; + } else if self.max_pos == self.head && new < prev_max { + // This was the maximum and may not be anymore + // do a linear seek to find the new maximum + let (idx, _val) = self + .history + .iter() + .enumerate() + .max_by_key(|(_, v)| *v) + .expect("array has fixded size > 0"); + // invariant_m: idx is from an enumeration of history + self.max_pos = idx.try_into().expect("history.len() <= u8::MAX"); + } + + // invariant_h: head is kept in bounds by %-ing with history.len() + // it is a ring buffer so wrapping around is expected behaviour. + self.head = (self.head + 1) % (self.history.len() as u8); + } + + /// Returns the maximum value out of the memorized entries + fn max(&self) -> u64 { + // exploit invariant_m to eliminate bounds checks & panic code path + self.history[self.max_pos as usize % self.history.len()] + } +} + +impl Default for MaxRingBuf { + fn default() -> Self { + assert!(HISTORY <= u8::MAX.into()); + assert!(HISTORY > 0); + Self { + history: [0; HISTORY], + // invariant_h: we asserted that history has at least one element, therefore index 0 is valid + head: 0, + // invariant_m: we asserted that history has at least one element, therefore index 0 is valid + max_pos: 0, + } + } +} + #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] impl tokio::io::AsyncWrite for ProgressBarIter { @@ -213,7 +352,9 @@ impl tokio::io::AsyncWrite for ProgressBarIter ) -> Poll> { Pin::new(&mut self.it).poll_write(cx, buf).map(|poll| { poll.map(|inc| { - self.progress.inc(inc as u64); + let oldprog = self.progress.position(); + let newprog = self.dejitter.update_seq(oldprog, inc.try_into().unwrap()); + self.progress.set_position(newprog); inc }) }) @@ -237,12 +378,14 @@ impl tokio::io::AsyncRead for ProgressBarIter, ) -> Poll> { let prev_len = buf.filled().len() as u64; - if let Poll::Ready(e) = Pin::new(&mut self.it).poll_read(cx, buf) { - self.progress.inc(buf.filled().len() as u64 - prev_len); - Poll::Ready(e) - } else { - Poll::Pending + let poll = Pin::new(&mut self.it).poll_read(cx, buf); + if let Poll::Ready(_e) = &poll { + let inc = buf.filled().len() as u64 - prev_len; + let oldprog = self.progress.position(); + let newprog = self.dejitter.update_seq(oldprog, inc); + self.progress.set_position(newprog); } + poll } } @@ -254,7 +397,13 @@ impl tokio::io::AsyncSeek for ProgressBarIter, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.it).poll_complete(cx) + let poll = Pin::new(&mut self.it).poll_complete(cx); + if let Poll::Ready(Ok(pos)) = &poll { + let newpos = self.dejitter.update_seek(*pos); + self.progress.set_position(newpos); + } + + poll } } @@ -270,7 +419,9 @@ impl tokio::io::Async fn consume(mut self: Pin<&mut Self>, amt: usize) { Pin::new(&mut self.it).consume(amt); - self.progress.inc(amt.try_into().unwrap()); + let oldprog = self.progress.position(); + let newprog = self.dejitter.update_seq(oldprog, amt.try_into().unwrap()); + self.progress.set_position(newprog); } } @@ -297,14 +448,20 @@ impl futures_core::Stream for ProgressBarIter io::Write for ProgressBarIter { fn write(&mut self, buf: &[u8]) -> io::Result { self.it.write(buf).map(|inc| { - self.progress.inc(inc as u64); + self.progress.set_position( + self.dejitter + .update_seq(self.progress.position(), inc as u64), + ); inc }) } fn write_vectored(&mut self, bufs: &[io::IoSlice]) -> io::Result { self.it.write_vectored(bufs).map(|inc| { - self.progress.inc(inc as u64); + self.progress.set_position( + self.dejitter + .update_seq(self.progress.position(), inc as u64), + ); inc }) } @@ -320,7 +477,11 @@ impl io::Write for ProgressBarIter { impl> ProgressIterator for T { fn progress_with(self, progress: ProgressBar) -> ProgressBarIter { - ProgressBarIter { it: self, progress } + ProgressBarIter { + it: self, + progress, + dejitter: MaxSeekHeuristic::default(), + } } } @@ -350,4 +511,16 @@ mod test { v.iter().progress_with_style(style) }); } + + #[test] + fn test_max_ring_buf() { + use crate::iter::MaxRingBuf; + let mut max = MaxRingBuf::<10>::default(); + max.update(100); + assert_eq!(max.max(), 100); + for i in 0..10 { + max.update(99 - i); + } + assert_eq!(max.max(), 99); + } } diff --git a/src/progress_bar.rs b/src/progress_bar.rs index 8f5334e1..39f15a6b 100644 --- a/src/progress_bar.rs +++ b/src/progress_bar.rs @@ -15,7 +15,7 @@ use web_time::Instant; use crate::draw_target::ProgressDrawTarget; use crate::state::{AtomicPosition, BarState, ProgressFinish, Reset, TabExpandedString}; use crate::style::ProgressStyle; -use crate::{ProgressBarIter, ProgressIterator, ProgressState}; +use crate::{iter, ProgressBarIter, ProgressIterator, ProgressState}; /// A progress bar or spinner /// @@ -498,6 +498,7 @@ impl ProgressBar { ProgressBarIter { progress: self.clone(), it: read, + dejitter: iter::MaxSeekHeuristic::default(), } } @@ -519,6 +520,7 @@ impl ProgressBar { ProgressBarIter { progress: self.clone(), it: write, + dejitter: iter::MaxSeekHeuristic::default(), } } @@ -545,6 +547,7 @@ impl ProgressBar { ProgressBarIter { progress: self.clone(), it: write, + dejitter: iter::MaxSeekHeuristic::default(), } } @@ -568,6 +571,7 @@ impl ProgressBar { ProgressBarIter { progress: self.clone(), it: read, + dejitter: iter::MaxSeekHeuristic::default(), } } @@ -590,6 +594,7 @@ impl ProgressBar { ProgressBarIter { progress: self.clone(), it: stream, + dejitter: iter::MaxSeekHeuristic::default(), } } diff --git a/src/rayon.rs b/src/rayon.rs index 1c8e844f..342afece 100644 --- a/src/rayon.rs +++ b/src/rayon.rs @@ -1,7 +1,7 @@ use rayon::iter::plumbing::{Consumer, Folder, Producer, ProducerCallback, UnindexedConsumer}; use rayon::iter::{IndexedParallelIterator, ParallelIterator}; -use crate::{ProgressBar, ProgressBarIter}; +use crate::{iter::MaxSeekHeuristic, ProgressBar, ProgressBarIter}; /// Wraps a Rayon parallel iterator. /// @@ -41,7 +41,11 @@ where impl> ParallelProgressIterator for T { fn progress_with(self, progress: ProgressBar) -> ProgressBarIter { - ProgressBarIter { it: self, progress } + ProgressBarIter { + it: self, + progress, + dejitter: MaxSeekHeuristic::default(), + } } } @@ -99,6 +103,7 @@ impl> Producer for ProgressProducer

{ ProgressBarIter { it: self.base.into_iter(), progress: self.progress, + dejitter: MaxSeekHeuristic::default(), } }