Skip to content

Modernize Cursor API #596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions differential-dataflow/src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,11 @@ where

let mut thinker = JoinThinker::new();

while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel {
while let (Some(batch_key), Some(trace_key), true) = (batch.get_key(batch_storage), trace.get_key(trace_storage), effort < *fuel) {

match trace.key(trace_storage).cmp(&batch.key(batch_storage)) {
Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)),
Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)),
match trace_key.cmp(&batch_key) {
Ordering::Less => trace.seek_key(trace_storage, batch_key),
Ordering::Greater => batch.seek_key(batch_storage, trace_key),
Ordering::Equal => {

use crate::IntoOwned;
Expand All @@ -679,8 +679,7 @@ where

// populate `temp` with the results in the best way we know how.
thinker.think(|v1,v2,t,r1,r2| {
let key = batch.key(batch_storage);
logic(key, v1, v2, &t, r1, r2, &mut session);
logic(batch_key, v1, v2, &t, r1, r2, &mut session);
});

// TODO: Effort isn't perfectly tracked as we might still have some data in the
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ impl<'a, C: Cursor> EditList<'a, C> {
L: Fn(C::TimeGat<'_>)->C::Time,
{
self.clear();
while cursor.val_valid(storage) {
while let Some(val) = cursor.get_val(storage) {
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.into_owned()));
self.seal(cursor.val(storage));
self.seal(val);
cursor.step_val(storage);
}
}
Expand Down
93 changes: 56 additions & 37 deletions differential-dataflow/src/trace/cursor/cursor_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,58 +26,68 @@ impl<C: Cursor> CursorList<C> {
result
}

// Initialize min_key with the indices of cursors with the minimum key.
//
// This method scans the current keys of each cursor, and tracks the indices
// of cursors whose key equals the minimum valid key seen so far. As it goes,
// if it observes an improved key it clears the current list, updates the
// minimum key, and continues.
//
// Once finished, it invokes `minimize_vals()` to ensure the value cursor is
// in a consistent state as well.
/// Initialize min_key with the indices of cursors with the minimum key.
///
/// This method scans the current keys of each cursor, and tracks the indices
/// of cursors whose key equals the minimum valid key seen so far. As it goes,
/// if it observes an improved key it clears the current list, updates the
/// minimum key, and continues.
///
/// Once finished, it invokes `minimize_vals()` to ensure the value cursor is
/// in a consistent state as well.
fn minimize_keys(&mut self, storage: &[C::Storage]) {

self.min_key.clear();

// Determine the index of the cursor with minimum key.
let mut min_key_opt = None;
for (index, cursor) in self.cursors.iter().enumerate() {
let key = cursor.get_key(&storage[index]);
if key.is_some() {
if min_key_opt.is_none() || key.lt(&min_key_opt) {
min_key_opt = key;
self.min_key.clear();
}
if key.eq(&min_key_opt) {
self.min_key.push(index);
// We'll visit each non-`None` key, maintaining the indexes of the least keys in `self.min_key`.
let mut iter = self.cursors.iter().enumerate().flat_map(|(idx, cur)| cur.get_key(&storage[idx]).map(|key| (idx, key)));
if let Some((idx, key)) = iter.next() {
let mut min_key = key;
self.min_key.push(idx);
for (idx, key) in iter {
match key.cmp(&min_key) {
std::cmp::Ordering::Less => {
self.min_key.clear();
self.min_key.push(idx);
min_key = key;
}
std::cmp::Ordering::Equal => {
self.min_key.push(idx);
}
std::cmp::Ordering::Greater => { }
}
}
}

self.minimize_vals(storage);
}

// Initialize min_val with the indices of minimum key cursors with the minimum value.
//
// This method scans the current values of cursor with minimum keys, and tracks the
// indices of cursors whose value equals the minimum valid value seen so far. As it
// goes, if it observes an improved value it clears the current list, updates the minimum
// value, and continues.
/// Initialize min_val with the indices of minimum key cursors with the minimum value.
///
/// This method scans the current values of cursor with minimum keys, and tracks the
/// indices of cursors whose value equals the minimum valid value seen so far. As it
/// goes, if it observes an improved value it clears the current list, updates the minimum
/// value, and continues.
fn minimize_vals(&mut self, storage: &[C::Storage]) {

self.min_val.clear();

// Determine the index of the cursor with minimum value.
let mut min_val = None;
for &index in self.min_key.iter() {
let val = self.cursors[index].get_val(&storage[index]);
if val.is_some() {
if min_val.is_none() || val.lt(&min_val) {
min_val = val;
self.min_val.clear();
}
if val.eq(&min_val) {
self.min_val.push(index);
// We'll visit each non-`None` value, maintaining the indexes of the least values in `self.min_val`.
let mut iter = self.min_key.iter().cloned().flat_map(|idx| self.cursors[idx].get_val(&storage[idx]).map(|val| (idx, val)));
if let Some((idx, val)) = iter.next() {
let mut min_val = val;
self.min_val.push(idx);
for (idx, val) in iter {
match val.cmp(&min_val) {
std::cmp::Ordering::Less => {
self.min_val.clear();
self.min_val.push(idx);
min_val = val;
}
std::cmp::Ordering::Equal => {
self.min_val.push(idx);
}
std::cmp::Ordering::Greater => { }
}
}
}
Expand Down Expand Up @@ -114,6 +124,15 @@ impl<C: Cursor> Cursor for CursorList<C> {
debug_assert!(self.cursors[self.min_val[0]].val_valid(&storage[self.min_val[0]]));
self.cursors[self.min_val[0]].val(&storage[self.min_val[0]])
}
#[inline]
fn get_key<'a>(&self, storage: &'a Vec<C::Storage>) -> Option<Self::Key<'a>> {
self.min_key.get(0).map(|idx| self.cursors[*idx].key(&storage[*idx]))
}
#[inline]
fn get_val<'a>(&self, storage: &'a Vec<C::Storage>) -> Option<Self::Val<'a>> {
self.min_val.get(0).map(|idx| self.cursors[*idx].val(&storage[*idx]))
}

#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
for &index in self.min_val.iter() {
Expand Down
14 changes: 5 additions & 9 deletions differential-dataflow/src/trace/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,9 @@ pub trait Cursor {
fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a>;

/// Returns a reference to the current key, if valid.
fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
if self.key_valid(storage) { Some(self.key(storage)) } else { None }
}
fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>>;
/// Returns a reference to the current value, if valid.
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
if self.val_valid(storage) { Some(self.val(storage)) } else { None }
}
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>>;

/// Applies `logic` to each pair of time and difference. Intended for mutation of the
/// closure's scope.
Expand Down Expand Up @@ -85,14 +81,14 @@ pub trait Cursor {
{
let mut out = Vec::new();
self.rewind_keys(storage);
while self.key_valid(storage) {
while let Some(key) = self.get_key(storage) {
self.rewind_vals(storage);
while self.val_valid(storage) {
while let Some(val) = self.get_val(storage) {
let mut kv_out = Vec::new();
self.map_times(storage, |ts, r| {
kv_out.push((ts.into_owned(), r.into_owned()));
});
out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out));
out.push(((key.into_owned(), val.into_owned()), kv_out));
self.step_val(storage);
}
self.step_key(storage);
Expand Down
12 changes: 12 additions & 0 deletions differential-dataflow/src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,15 @@ pub mod containers {

/// Reference to the element at this position.
fn index(&self, index: usize) -> Self::ReadItem<'_>;

/// Reference to the element at this position, if it exists.
fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
if index < self.len() {
Some(self.index(index))
}
else { None }
}

/// Number of contained elements
fn len(&self) -> usize;
/// Returns the last item if the container is non-empty.
Expand Down Expand Up @@ -559,6 +568,9 @@ pub mod containers {
fn index(&self, index: usize) -> Self::ReadItem<'_> {
&self[index]
}
fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
<[T]>::get(&self, index)
}
fn len(&self) -> usize {
self[..].len()
}
Expand Down
6 changes: 6 additions & 0 deletions differential-dataflow/src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ mod val_batch {

type Storage = OrdValBatch<L>;

fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }

fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
Expand Down Expand Up @@ -997,6 +1000,9 @@ mod key_batch {

type Storage = OrdKeyBatch<L>;

fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a ()> { if self.val_valid(storage) { Some(&()) } else { None } }

fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,9 @@ mod val_batch {

type Storage = RhhValBatch<L>;

fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> {
storage.storage.keys.index(self.key_cursor)
}
fn get_key<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
fn get_val<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Val<'a>> { if self.val_valid(storage) { storage.storage.vals.get(self.val_cursor) } else { None } }
fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
Expand Down
3 changes: 3 additions & 0 deletions differential-dataflow/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ pub mod rc_blanket_impls {
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }

#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
Expand Down
6 changes: 6 additions & 0 deletions differential-dataflow/src/trace/wrappers/enter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ where
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }

#[inline]
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
use crate::IntoOwned;
Expand Down Expand Up @@ -238,6 +241,9 @@ where
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }

#[inline]
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
use crate::IntoOwned;
Expand Down
6 changes: 6 additions & 0 deletions differential-dataflow/src/trace/wrappers/enter_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ where
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }

#[inline]
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
let key = self.key(storage);
Expand Down Expand Up @@ -270,6 +273,9 @@ where
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }

#[inline]
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
let key = self.key(storage);
Expand Down
6 changes: 6 additions & 0 deletions differential-dataflow/src/trace/wrappers/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ where
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }

#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
let key = self.key(storage);
Expand Down Expand Up @@ -203,6 +206,9 @@ where
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }

#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }

#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
let key = self.key(storage);
Expand Down
6 changes: 6 additions & 0 deletions differential-dataflow/src/trace/wrappers/freeze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ where
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }

#[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
let func = &self.func;
self.cursor.map_times(storage, |time, diff| {
Expand Down Expand Up @@ -251,6 +254,9 @@ where
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }

#[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
let func = &self.func;
self.cursor.map_times(&storage.batch, |time, diff| {
Expand Down
6 changes: 6 additions & 0 deletions differential-dataflow/src/trace/wrappers/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ impl<C: Cursor> Cursor for CursorFrontier<C, C::Time> {
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }

#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
let since = self.since.borrow();
Expand Down Expand Up @@ -206,6 +209,9 @@ where
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }

#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
let since = self.since.borrow();
Expand Down