Skip to content

Commit bde9f5b

Browse files
committed
Encourage Cursor::{get_key, get_val}; Update CursorList
1 parent 25bc4a1 commit bde9f5b

File tree

4 files changed

+58
-47
lines changed

4 files changed

+58
-47
lines changed

differential-dataflow/src/operators/join.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -661,11 +661,12 @@ where
661661

662662
let mut thinker = JoinThinker::new();
663663

664-
while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel {
664+
while let (Some(batch_key), Some(trace_key), true) = (batch.get_key(batch_storage), trace.get_key(trace_storage), effort < *fuel) {
665+
// while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel {
665666

666-
match trace.key(trace_storage).cmp(&batch.key(batch_storage)) {
667-
Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)),
668-
Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)),
667+
match trace_key.cmp(&batch_key) {
668+
Ordering::Less => trace.seek_key(trace_storage, batch_key),
669+
Ordering::Greater => batch.seek_key(batch_storage, trace_key),
669670
Ordering::Equal => {
670671

671672
use crate::IntoOwned;
@@ -679,7 +680,7 @@ where
679680

680681
// populate `temp` with the results in the best way we know how.
681682
thinker.think(|v1,v2,t,r1,r2| {
682-
let key = batch.key(batch_storage);
683+
let key = batch_key;
683684
logic(key, v1, v2, &t, r1, r2, &mut session);
684685
});
685686

differential-dataflow/src/operators/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ impl<'a, C: Cursor> EditList<'a, C> {
4545
L: Fn(C::TimeGat<'_>)->C::Time,
4646
{
4747
self.clear();
48-
while cursor.val_valid(storage) {
48+
while let Some(val) = cursor.get_val(storage) {
4949
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.into_owned()));
50-
self.seal(cursor.val(storage));
50+
self.seal(val);
5151
cursor.step_val(storage);
5252
}
5353
}

differential-dataflow/src/trace/cursor/cursor_list.rs

+47-37
Original file line numberDiff line numberDiff line change
@@ -26,58 +26,68 @@ impl<C: Cursor> CursorList<C> {
2626
result
2727
}
2828

29-
// Initialize min_key with the indices of cursors with the minimum key.
30-
//
31-
// This method scans the current keys of each cursor, and tracks the indices
32-
// of cursors whose key equals the minimum valid key seen so far. As it goes,
33-
// if it observes an improved key it clears the current list, updates the
34-
// minimum key, and continues.
35-
//
36-
// Once finished, it invokes `minimize_vals()` to ensure the value cursor is
37-
// in a consistent state as well.
29+
/// Initialize min_key with the indices of cursors with the minimum key.
30+
///
31+
/// This method scans the current keys of each cursor, and tracks the indices
32+
/// of cursors whose key equals the minimum valid key seen so far. As it goes,
33+
/// if it observes an improved key it clears the current list, updates the
34+
/// minimum key, and continues.
35+
///
36+
/// Once finished, it invokes `minimize_vals()` to ensure the value cursor is
37+
/// in a consistent state as well.
3838
fn minimize_keys(&mut self, storage: &[C::Storage]) {
3939

4040
self.min_key.clear();
4141

42-
// Determine the index of the cursor with minimum key.
43-
let mut min_key_opt = None;
44-
for (index, cursor) in self.cursors.iter().enumerate() {
45-
let key = cursor.get_key(&storage[index]);
46-
if key.is_some() {
47-
if min_key_opt.is_none() || key.lt(&min_key_opt) {
48-
min_key_opt = key;
49-
self.min_key.clear();
50-
}
51-
if key.eq(&min_key_opt) {
52-
self.min_key.push(index);
42+
// We'll visit each non-`None` key, maintaining the indexes of the least keys in `self.min_key`.
43+
let mut iter = self.cursors.iter().enumerate().flat_map(|(idx, cur)| cur.get_key(&storage[idx]).map(|key| (idx, key)));
44+
if let Some((idx, key)) = iter.next() {
45+
let mut min_key = key;
46+
self.min_key.push(idx);
47+
for (idx, key) in iter {
48+
match key.cmp(&min_key) {
49+
std::cmp::Ordering::Less => {
50+
self.min_key.clear();
51+
self.min_key.push(idx);
52+
min_key = key;
53+
}
54+
std::cmp::Ordering::Equal => {
55+
self.min_key.push(idx);
56+
}
57+
std::cmp::Ordering::Greater => { }
5358
}
5459
}
5560
}
5661

5762
self.minimize_vals(storage);
5863
}
5964

60-
// Initialize min_val with the indices of minimum key cursors with the minimum value.
61-
//
62-
// This method scans the current values of cursor with minimum keys, and tracks the
63-
// indices of cursors whose value equals the minimum valid value seen so far. As it
64-
// goes, if it observes an improved value it clears the current list, updates the minimum
65-
// value, and continues.
65+
/// Initialize min_val with the indices of minimum key cursors with the minimum value.
66+
///
67+
/// This method scans the current values of cursor with minimum keys, and tracks the
68+
/// indices of cursors whose value equals the minimum valid value seen so far. As it
69+
/// goes, if it observes an improved value it clears the current list, updates the minimum
70+
/// value, and continues.
6671
fn minimize_vals(&mut self, storage: &[C::Storage]) {
6772

6873
self.min_val.clear();
6974

70-
// Determine the index of the cursor with minimum value.
71-
let mut min_val = None;
72-
for &index in self.min_key.iter() {
73-
let val = self.cursors[index].get_val(&storage[index]);
74-
if val.is_some() {
75-
if min_val.is_none() || val.lt(&min_val) {
76-
min_val = val;
77-
self.min_val.clear();
78-
}
79-
if val.eq(&min_val) {
80-
self.min_val.push(index);
75+
// We'll visit each non-`None` value, maintaining the indexes of the least values in `self.min_val`.
76+
let mut iter = self.min_key.iter().cloned().flat_map(|idx| self.cursors[idx].get_val(&storage[idx]).map(|val| (idx, val)));
77+
if let Some((idx, val)) = iter.next() {
78+
let mut min_val = val;
79+
self.min_val.push(idx);
80+
for (idx, val) in iter {
81+
match val.cmp(&min_val) {
82+
std::cmp::Ordering::Less => {
83+
self.min_val.clear();
84+
self.min_val.push(idx);
85+
min_val = val;
86+
}
87+
std::cmp::Ordering::Equal => {
88+
self.min_val.push(idx);
89+
}
90+
std::cmp::Ordering::Greater => { }
8191
}
8292
}
8393
}

differential-dataflow/src/trace/cursor/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ pub trait Cursor {
8181
{
8282
let mut out = Vec::new();
8383
self.rewind_keys(storage);
84-
while self.key_valid(storage) {
84+
while let Some(key) = self.get_key(storage) {
8585
self.rewind_vals(storage);
86-
while self.val_valid(storage) {
86+
while let Some(val) = self.get_val(storage) {
8787
let mut kv_out = Vec::new();
8888
self.map_times(storage, |ts, r| {
8989
kv_out.push((ts.into_owned(), r.into_owned()));
9090
});
91-
out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out));
91+
out.push(((key.into_owned(), val.into_owned()), kv_out));
9292
self.step_val(storage);
9393
}
9494
self.step_key(storage);

0 commit comments

Comments
 (0)