diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 560c1d913..68da44d5a 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -661,11 +661,11 @@ where let mut thinker = JoinThinker::new(); - while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel { + while let (Some(batch_key), Some(trace_key), true) = (batch.get_key(batch_storage), trace.get_key(trace_storage), effort < *fuel) { - match trace.key(trace_storage).cmp(&batch.key(batch_storage)) { - Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)), - Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)), + match trace_key.cmp(&batch_key) { + Ordering::Less => trace.seek_key(trace_storage, batch_key), + Ordering::Greater => batch.seek_key(batch_storage, trace_key), Ordering::Equal => { use crate::IntoOwned; @@ -679,8 +679,7 @@ where // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { - let key = batch.key(batch_storage); - logic(key, v1, v2, &t, r1, r2, &mut session); + logic(batch_key, v1, v2, &t, r1, r2, &mut session); }); // TODO: Effort isn't perfectly tracked as we might still have some data in the diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 615cfa399..ab875738f 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -45,9 +45,9 @@ impl<'a, C: Cursor> EditList<'a, C> { L: Fn(C::TimeGat<'_>)->C::Time, { self.clear(); - while cursor.val_valid(storage) { + while let Some(val) = cursor.get_val(storage) { cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.into_owned())); - self.seal(cursor.val(storage)); + self.seal(val); cursor.step_val(storage); } } diff --git a/differential-dataflow/src/trace/cursor/cursor_list.rs b/differential-dataflow/src/trace/cursor/cursor_list.rs index f396b67a0..2a175cef0 100644 --- a/differential-dataflow/src/trace/cursor/cursor_list.rs +++ b/differential-dataflow/src/trace/cursor/cursor_list.rs @@ -26,30 +26,35 @@ impl CursorList { result } - // Initialize min_key with the indices of cursors with the minimum key. - // - // This method scans the current keys of each cursor, and tracks the indices - // of cursors whose key equals the minimum valid key seen so far. As it goes, - // if it observes an improved key it clears the current list, updates the - // minimum key, and continues. - // - // Once finished, it invokes `minimize_vals()` to ensure the value cursor is - // in a consistent state as well. + /// Initialize min_key with the indices of cursors with the minimum key. + /// + /// This method scans the current keys of each cursor, and tracks the indices + /// of cursors whose key equals the minimum valid key seen so far. As it goes, + /// if it observes an improved key it clears the current list, updates the + /// minimum key, and continues. + /// + /// Once finished, it invokes `minimize_vals()` to ensure the value cursor is + /// in a consistent state as well. fn minimize_keys(&mut self, storage: &[C::Storage]) { self.min_key.clear(); - // Determine the index of the cursor with minimum key. - let mut min_key_opt = None; - for (index, cursor) in self.cursors.iter().enumerate() { - let key = cursor.get_key(&storage[index]); - if key.is_some() { - if min_key_opt.is_none() || key.lt(&min_key_opt) { - min_key_opt = key; - self.min_key.clear(); - } - if key.eq(&min_key_opt) { - self.min_key.push(index); + // We'll visit each non-`None` key, maintaining the indexes of the least keys in `self.min_key`. + let mut iter = self.cursors.iter().enumerate().flat_map(|(idx, cur)| cur.get_key(&storage[idx]).map(|key| (idx, key))); + if let Some((idx, key)) = iter.next() { + let mut min_key = key; + self.min_key.push(idx); + for (idx, key) in iter { + match key.cmp(&min_key) { + std::cmp::Ordering::Less => { + self.min_key.clear(); + self.min_key.push(idx); + min_key = key; + } + std::cmp::Ordering::Equal => { + self.min_key.push(idx); + } + std::cmp::Ordering::Greater => { } } } } @@ -57,27 +62,32 @@ impl CursorList { self.minimize_vals(storage); } - // Initialize min_val with the indices of minimum key cursors with the minimum value. - // - // This method scans the current values of cursor with minimum keys, and tracks the - // indices of cursors whose value equals the minimum valid value seen so far. As it - // goes, if it observes an improved value it clears the current list, updates the minimum - // value, and continues. + /// Initialize min_val with the indices of minimum key cursors with the minimum value. + /// + /// This method scans the current values of cursor with minimum keys, and tracks the + /// indices of cursors whose value equals the minimum valid value seen so far. As it + /// goes, if it observes an improved value it clears the current list, updates the minimum + /// value, and continues. fn minimize_vals(&mut self, storage: &[C::Storage]) { self.min_val.clear(); - // Determine the index of the cursor with minimum value. - let mut min_val = None; - for &index in self.min_key.iter() { - let val = self.cursors[index].get_val(&storage[index]); - if val.is_some() { - if min_val.is_none() || val.lt(&min_val) { - min_val = val; - self.min_val.clear(); - } - if val.eq(&min_val) { - self.min_val.push(index); + // We'll visit each non-`None` value, maintaining the indexes of the least values in `self.min_val`. + let mut iter = self.min_key.iter().cloned().flat_map(|idx| self.cursors[idx].get_val(&storage[idx]).map(|val| (idx, val))); + if let Some((idx, val)) = iter.next() { + let mut min_val = val; + self.min_val.push(idx); + for (idx, val) in iter { + match val.cmp(&min_val) { + std::cmp::Ordering::Less => { + self.min_val.clear(); + self.min_val.push(idx); + min_val = val; + } + std::cmp::Ordering::Equal => { + self.min_val.push(idx); + } + std::cmp::Ordering::Greater => { } } } } @@ -114,6 +124,15 @@ impl Cursor for CursorList { debug_assert!(self.cursors[self.min_val[0]].val_valid(&storage[self.min_val[0]])); self.cursors[self.min_val[0]].val(&storage[self.min_val[0]]) } + #[inline] + fn get_key<'a>(&self, storage: &'a Vec) -> Option> { + self.min_key.get(0).map(|idx| self.cursors[*idx].key(&storage[*idx])) + } + #[inline] + fn get_val<'a>(&self, storage: &'a Vec) -> Option> { + self.min_val.get(0).map(|idx| self.cursors[*idx].val(&storage[*idx])) + } + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Vec, mut logic: L) { for &index in self.min_val.iter() { diff --git a/differential-dataflow/src/trace/cursor/mod.rs b/differential-dataflow/src/trace/cursor/mod.rs index e53c0425a..dfcab95f3 100644 --- a/differential-dataflow/src/trace/cursor/mod.rs +++ b/differential-dataflow/src/trace/cursor/mod.rs @@ -50,13 +50,9 @@ pub trait Cursor { fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a>; /// Returns a reference to the current key, if valid. - fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { - if self.key_valid(storage) { Some(self.key(storage)) } else { None } - } + fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option>; /// Returns a reference to the current value, if valid. - fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { - if self.val_valid(storage) { Some(self.val(storage)) } else { None } - } + fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option>; /// Applies `logic` to each pair of time and difference. Intended for mutation of the /// closure's scope. @@ -85,14 +81,14 @@ pub trait Cursor { { let mut out = Vec::new(); self.rewind_keys(storage); - while self.key_valid(storage) { + while let Some(key) = self.get_key(storage) { self.rewind_vals(storage); - while self.val_valid(storage) { + while let Some(val) = self.get_val(storage) { let mut kv_out = Vec::new(); self.map_times(storage, |ts, r| { kv_out.push((ts.into_owned(), r.into_owned())); }); - out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out)); + out.push(((key.into_owned(), val.into_owned()), kv_out)); self.step_val(storage); } self.step_key(storage); diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index f62dcf765..717c9df68 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -483,6 +483,15 @@ pub mod containers { /// Reference to the element at this position. fn index(&self, index: usize) -> Self::ReadItem<'_>; + + /// Reference to the element at this position, if it exists. + fn get(&self, index: usize) -> Option> { + if index < self.len() { + Some(self.index(index)) + } + else { None } + } + /// Number of contained elements fn len(&self) -> usize; /// Returns the last item if the container is non-empty. @@ -559,6 +568,9 @@ pub mod containers { fn index(&self, index: usize) -> Self::ReadItem<'_> { &self[index] } + fn get(&self, index: usize) -> Option> { + <[T]>::get(&self, index) + } fn len(&self) -> usize { self[..].len() } diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 2fcf4be3e..550f0d646 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -493,6 +493,9 @@ mod val_batch { type Storage = OrdValBatch; + fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { storage.storage.keys.get(self.key_cursor) } + fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } } + fn key<'a>(&self, storage: &'a OrdValBatch) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a OrdValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch, mut logic: L2) { @@ -997,6 +1000,9 @@ mod key_batch { type Storage = OrdKeyBatch; + fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { storage.storage.keys.get(self.key_cursor) } + fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a ()> { if self.val_valid(storage) { Some(&()) } else { None } } + fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) { diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index d1197bf90..48ce98899 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -666,9 +666,9 @@ mod val_batch { type Storage = RhhValBatch; - fn key<'a>(&self, storage: &'a RhhValBatch) -> Self::Key<'a> { - storage.storage.keys.index(self.key_cursor) - } + fn get_key<'a>(&self, storage: &'a RhhValBatch) -> Option> { storage.storage.keys.get(self.key_cursor) } + fn get_val<'a>(&self, storage: &'a RhhValBatch) -> Option> { if self.val_valid(storage) { storage.storage.vals.get(self.val_cursor) } else { None } } + fn key<'a>(&self, storage: &'a RhhValBatch) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a RhhValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 337076e18..198f99bb6 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -403,6 +403,9 @@ pub mod rc_blanket_impls { #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { self.cursor.map_times(storage, logic) diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index a4e0b5ca3..418b3e573 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -184,6 +184,9 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } + #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { use crate::IntoOwned; @@ -238,6 +241,9 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(&storage.batch) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(&storage.batch) } + #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { use crate::IntoOwned; diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index bde170090..2f27fb70e 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -211,6 +211,9 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } + #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { let key = self.key(storage); @@ -270,6 +273,9 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(&storage.batch) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(&storage.batch) } + #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { let key = self.key(storage); diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs index f43ff0dc1..1fe95b7ff 100644 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ b/differential-dataflow/src/trace/wrappers/filter.rs @@ -148,6 +148,9 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { let key = self.key(storage); @@ -203,6 +206,9 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(&storage.batch) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(&storage.batch) } + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { let key = self.key(storage); diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs index 6a39fa0b0..0bc67b626 100644 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ b/differential-dataflow/src/trace/wrappers/freeze.rs @@ -199,6 +199,9 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let func = &self.func; self.cursor.map_times(storage, |time, diff| { @@ -251,6 +254,9 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(&storage.batch) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(&storage.batch) } + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let func = &self.func; self.cursor.map_times(&storage.batch, |time, diff| { diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index 34f4da143..2330cc043 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -143,6 +143,9 @@ impl Cursor for CursorFrontier { #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let since = self.since.borrow(); @@ -206,6 +209,9 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } + #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(&storage.batch) } + #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(&storage.batch) } + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let since = self.since.borrow();