Skip to content

Commit b3e790c

Browse files
authored
Add multiplication operator and test behavior of aggregations with grouping expressions (#571)
1 parent 6b2a12c commit b3e790c

7 files changed

+369
-28
lines changed

hydroflow/tests/datalog_frontend.rs

+27-1
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,31 @@ fn test_aggregations_and_comments() {
805805
assert_eq!(&res, &[(3, 1), (10, 3)]);
806806
}
807807

808+
#[multiplatform_test]
809+
fn test_aggregations_group_by_expr() {
810+
let (ints_send, ints) = hydroflow::util::unbounded_channel::<(usize, usize)>();
811+
let (result, mut result_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();
812+
813+
let mut flow = datalog!(
814+
r#"
815+
.input ints `source_stream(ints)`
816+
.output result `for_each(|v| result.send(v).unwrap())`
817+
818+
result(a % 2, sum(b)) :- ints(a, b)
819+
"#
820+
);
821+
822+
ints_send.send((1, 1)).unwrap();
823+
ints_send.send((2, 1)).unwrap();
824+
ints_send.send((3, 1)).unwrap();
825+
826+
flow.run_tick();
827+
828+
let mut res = collect_ready::<Vec<_>, _>(&mut result_recv);
829+
res.sort_by_key(|v| v.0);
830+
assert_eq!(&res, &[(0, 1), (1, 2)]);
831+
}
832+
808833
#[multiplatform_test]
809834
fn test_choose_strings() {
810835
let (strings_send, strings) = hydroflow::util::unbounded_channel::<(String,)>();
@@ -870,6 +895,7 @@ fn test_expr_lhs() {
870895
result(a + a) :- ints(a)
871896
result(123 - a) :- ints(a)
872897
result(123 % (a + 5)) :- ints(a)
898+
result(a * 5) :- ints(a)
873899
"#
874900
);
875901

@@ -879,7 +905,7 @@ fn test_expr_lhs() {
879905

880906
assert_eq!(
881907
&*collect_ready::<Vec<_>, _>(&mut result_recv),
882-
&[(123,), (124,), (2,), (122,), (3,)]
908+
&[(123,), (124,), (2,), (122,), (3,), (5,)]
883909
);
884910
}
885911

hydroflow_datalog_core/src/grammar.rs

+11
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,12 @@ pub mod datalog {
255255
#[rust_sitter::leaf(text = "-")] (),
256256
Box<IntExpr>,
257257
),
258+
#[rust_sitter::prec_left(2)]
259+
Mul(
260+
Box<IntExpr>,
261+
#[rust_sitter::leaf(text = "*")] (),
262+
Box<IntExpr>,
263+
),
258264
#[rust_sitter::prec_left(1)]
259265
Mod(
260266
Box<IntExpr>,
@@ -279,6 +285,11 @@ pub mod datalog {
279285
idents.extend(r.idents());
280286
idents
281287
}
288+
IntExpr::Mul(l, _, r) => {
289+
let mut idents = l.idents();
290+
idents.extend(r.idents());
291+
idents
292+
}
282293
IntExpr::Mod(l, _, r) => {
283294
let mut idents = l.idents();
284295
idents.extend(r.idents());

hydroflow_datalog_core/src/lib.rs

+18
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,11 @@ pub(crate) fn gen_value_expr(
490490
let r = gen_value_expr(r, lookup_ident, get_span);
491491
parse_quote!(#l - #r)
492492
}
493+
IntExpr::Mul(l, _, r) => {
494+
let l = gen_value_expr(l, lookup_ident, get_span);
495+
let r = gen_value_expr(r, lookup_ident, get_span);
496+
parse_quote!(#l * #r)
497+
}
493498
IntExpr::Mod(l, _, r) => {
494499
let l = gen_value_expr(l, lookup_ident, get_span);
495500
let r = gen_value_expr(r, lookup_ident, get_span);
@@ -963,6 +968,18 @@ mod tests {
963968
);
964969
}
965970

971+
#[test]
972+
fn test_aggregations_group_by_expr() {
973+
test_snapshots!(
974+
r#"
975+
.input ints `source_stream(ints)`
976+
.output result `for_each(|v| result.send(v).unwrap())`
977+
978+
result(a % 2, sum(b)) :- ints(a, b)
979+
"#
980+
);
981+
}
982+
966983
#[test]
967984
fn test_non_copy_but_clone() {
968985
test_snapshots!(
@@ -987,6 +1004,7 @@ mod tests {
9871004
result(a + a) :- ints(a)
9881005
result(123 - a) :- ints(a)
9891006
result(123 % (a + 5)) :- ints(a)
1007+
result(a * 5) :- ints(a)
9901008
"#
9911009
);
9921010
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
---
2+
source: hydroflow_datalog_core/src/lib.rs
3+
expression: "prettyplease::unparse(&wrapped)"
4+
---
5+
fn main() {
6+
{
7+
use hydroflow::{var_expr, var_args};
8+
let mut df = hydroflow::scheduled::graph::Hydroflow::new();
9+
df.__assign_meta_graph(
10+
"{\"nodes\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":null,\"version\":2},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":{\"Handoff\":{}},\"version\":3},{\"value\":{\"Operator\":\"source_stream (ints)\"},\"version\":1},{\"value\":{\"Operator\":\"for_each (| v | result . send (v) . unwrap ())\"},\"version\":1},{\"value\":{\"Operator\":\"map (| row : (_ , _ ,) | ((row . 0 % 2 ,) , (row . 1 ,)))\"},\"version\":1},{\"value\":{\"Operator\":\"group_by :: < 'tick , (_ ,) , (Option < _ > ,) > (| | (None ,) , | old : & mut (Option < _ > ,) , val : (_ ,) | { old . 0 = if let Some (prev) = old . 0 . take () { Some (prev + val . 0) } else { Some (val . 0) } ; })\"},\"version\":1},{\"value\":{\"Operator\":\"map (| (g , a) | (g . 0 , a . 0 . unwrap () ,))\"},\"version\":1}],\"graph\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1}],\"version\":3},{\"value\":[{\"idx\":6,\"version\":3},{\"idx\":10,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":9,\"version\":1},{\"idx\":6,\"version\":3}],\"version\":3},{\"value\":[{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":3}],\"ports\":[{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":1},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3}],\"node_subgraph\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1}],\"subgraph_nodes\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":1}],\"subgraph_stratum\":[{\"value\":null,\"version\":0},{\"value\":1,\"version\":1},{\"value\":0,\"version\":1}],\"node_varnames\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"ints_insert\",\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"result_insert\",\"version\":1}]}",
11+
);
12+
df.__assign_diagnostics("[]");
13+
let (hoff_6v3_send, hoff_6v3_recv) = df
14+
.make_edge::<
15+
_,
16+
hydroflow::scheduled::handoff::VecHandoff<_>,
17+
>("handoff GraphNodeId(6v3)");
18+
let mut sg_2v1_node_7v1_stream = {
19+
#[inline(always)]
20+
fn check_stream<
21+
Stream: hydroflow::futures::stream::Stream<Item = Item>,
22+
Item,
23+
>(
24+
stream: Stream,
25+
) -> ::std::pin::Pin<
26+
::std::boxed::Box<impl hydroflow::futures::stream::Stream<Item = Item>>,
27+
> {
28+
::std::boxed::Box::pin(stream)
29+
}
30+
check_stream(ints)
31+
};
32+
let sg_2v1_node_2v1_uniquedata = df
33+
.add_state(
34+
::std::cell::RefCell::new(
35+
hydroflow::lang::monotonic_map::MonotonicMap::<
36+
_,
37+
hydroflow::rustc_hash::FxHashSet<_>,
38+
>::default(),
39+
),
40+
);
41+
df.add_subgraph_stratified(
42+
"Subgraph GraphSubgraphId(2v1)",
43+
0,
44+
var_expr!(),
45+
var_expr!(hoff_6v3_send),
46+
move |context, var_args!(), var_args!(hoff_6v3_send)| {
47+
let hoff_6v3_send = hydroflow::pusherator::for_each::ForEach::new(|v| {
48+
hoff_6v3_send.give(Some(v));
49+
});
50+
let op_7v1 = std::iter::from_fn(|| {
51+
match hydroflow::futures::stream::Stream::poll_next(
52+
sg_2v1_node_7v1_stream.as_mut(),
53+
&mut std::task::Context::from_waker(&context.waker()),
54+
) {
55+
std::task::Poll::Ready(maybe) => maybe,
56+
std::task::Poll::Pending => None,
57+
}
58+
});
59+
let op_7v1 = {
60+
#[inline(always)]
61+
pub fn check_op_7v1<Input: ::std::iter::Iterator<Item = Item>, Item>(
62+
input: Input,
63+
) -> impl ::std::iter::Iterator<Item = Item> {
64+
input
65+
}
66+
check_op_7v1(op_7v1)
67+
};
68+
let op_2v1 = op_7v1
69+
.filter(|item| {
70+
let mut borrow = context
71+
.state_ref(sg_2v1_node_2v1_uniquedata)
72+
.borrow_mut();
73+
let set = borrow
74+
.try_insert_with(
75+
(context.current_tick(), context.current_stratum()),
76+
hydroflow::rustc_hash::FxHashSet::default,
77+
);
78+
if !set.contains(item) {
79+
set.insert(::std::clone::Clone::clone(item));
80+
true
81+
} else {
82+
false
83+
}
84+
});
85+
let op_2v1 = {
86+
#[inline(always)]
87+
pub fn check_op_2v1<Input: ::std::iter::Iterator<Item = Item>, Item>(
88+
input: Input,
89+
) -> impl ::std::iter::Iterator<Item = Item> {
90+
input
91+
}
92+
check_op_2v1(op_2v1)
93+
};
94+
let op_9v1 = op_2v1.map(|row: (_, _)| ((row.0 % 2,), (row.1,)));
95+
let op_9v1 = {
96+
#[inline(always)]
97+
pub fn check_op_9v1<Input: ::std::iter::Iterator<Item = Item>, Item>(
98+
input: Input,
99+
) -> impl ::std::iter::Iterator<Item = Item> {
100+
input
101+
}
102+
check_op_9v1(op_9v1)
103+
};
104+
#[inline(always)]
105+
fn check_pivot_run<
106+
Pull: ::std::iter::Iterator<Item = Item>,
107+
Push: hydroflow::pusherator::Pusherator<Item = Item>,
108+
Item,
109+
>(pull: Pull, push: Push) {
110+
hydroflow::pusherator::pivot::Pivot::new(pull, push).run();
111+
}
112+
check_pivot_run(op_9v1, hoff_6v3_send);
113+
},
114+
);
115+
let sg_1v1_node_5v1_uniquedata = df
116+
.add_state(
117+
::std::cell::RefCell::new(
118+
hydroflow::lang::monotonic_map::MonotonicMap::<
119+
_,
120+
hydroflow::rustc_hash::FxHashSet<_>,
121+
>::default(),
122+
),
123+
);
124+
df.add_subgraph_stratified(
125+
"Subgraph GraphSubgraphId(1v1)",
126+
1,
127+
var_expr!(hoff_6v3_recv),
128+
var_expr!(),
129+
move |context, var_args!(hoff_6v3_recv), var_args!()| {
130+
let mut hoff_6v3_recv = hoff_6v3_recv.borrow_mut_swap();
131+
let hoff_6v3_recv = hoff_6v3_recv.drain(..);
132+
let op_10v1 = {
133+
#[inline(always)]
134+
fn check_input<Iter: ::std::iter::Iterator<Item = (A, B)>, A, B>(
135+
iter: Iter,
136+
) -> impl ::std::iter::Iterator<Item = (A, B)> {
137+
iter
138+
}
139+
check_input(hoff_6v3_recv)
140+
.fold(
141+
hydroflow::rustc_hash::FxHashMap::<
142+
(_,),
143+
(Option<_>,),
144+
>::default(),
145+
|mut ht, kv| {
146+
let entry = ht.entry(kv.0).or_insert_with(|| (None,));
147+
#[allow(clippy::redundant_closure_call)]
148+
(|old: &mut (Option<_>,), val: (_,)| {
149+
old
150+
.0 = if let Some(prev) = old.0.take() {
151+
Some(prev + val.0)
152+
} else {
153+
Some(val.0)
154+
};
155+
})(entry, kv.1);
156+
ht
157+
},
158+
)
159+
.into_iter()
160+
};
161+
let op_10v1 = {
162+
#[inline(always)]
163+
pub fn check_op_10v1<
164+
Input: ::std::iter::Iterator<Item = Item>,
165+
Item,
166+
>(input: Input) -> impl ::std::iter::Iterator<Item = Item> {
167+
input
168+
}
169+
check_op_10v1(op_10v1)
170+
};
171+
let op_11v1 = op_10v1.map(|(g, a)| (g.0, a.0.unwrap()));
172+
let op_11v1 = {
173+
#[inline(always)]
174+
pub fn check_op_11v1<
175+
Input: ::std::iter::Iterator<Item = Item>,
176+
Item,
177+
>(input: Input) -> impl ::std::iter::Iterator<Item = Item> {
178+
input
179+
}
180+
check_op_11v1(op_11v1)
181+
};
182+
let op_5v1 = op_11v1
183+
.filter(|item| {
184+
let mut borrow = context
185+
.state_ref(sg_1v1_node_5v1_uniquedata)
186+
.borrow_mut();
187+
let set = borrow
188+
.try_insert_with(
189+
(context.current_tick(), context.current_stratum()),
190+
hydroflow::rustc_hash::FxHashSet::default,
191+
);
192+
if !set.contains(item) {
193+
set.insert(::std::clone::Clone::clone(item));
194+
true
195+
} else {
196+
false
197+
}
198+
});
199+
let op_5v1 = {
200+
#[inline(always)]
201+
pub fn check_op_5v1<Input: ::std::iter::Iterator<Item = Item>, Item>(
202+
input: Input,
203+
) -> impl ::std::iter::Iterator<Item = Item> {
204+
input
205+
}
206+
check_op_5v1(op_5v1)
207+
};
208+
let op_8v1 = hydroflow::pusherator::for_each::ForEach::new(|v| {
209+
result.send(v).unwrap()
210+
});
211+
let op_8v1 = {
212+
#[inline(always)]
213+
pub fn check_op_8v1<
214+
Input: hydroflow::pusherator::Pusherator<Item = Item>,
215+
Item,
216+
>(
217+
input: Input,
218+
) -> impl hydroflow::pusherator::Pusherator<Item = Item> {
219+
input
220+
}
221+
check_op_8v1(op_8v1)
222+
};
223+
#[inline(always)]
224+
fn check_pivot_run<
225+
Pull: ::std::iter::Iterator<Item = Item>,
226+
Push: hydroflow::pusherator::Pusherator<Item = Item>,
227+
Item,
228+
>(pull: Pull, push: Push) {
229+
hydroflow::pusherator::pivot::Pivot::new(pull, push).run();
230+
}
231+
check_pivot_run(op_5v1, op_8v1);
232+
},
233+
);
234+
df
235+
}
236+
}
237+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
source: hydroflow_datalog_core/src/lib.rs
3+
expression: flat_graph_ref.surface_syntax_string()
4+
---
5+
2v1 = unique :: < 'tick > ();
6+
5v1 = unique :: < 'tick > ();
7+
7v1 = source_stream (ints);
8+
8v1 = for_each (| v | result . send (v) . unwrap ());
9+
9v1 = map (| row : (_ , _ ,) | ((row . 0 % 2 ,) , (row . 1 ,)));
10+
10v1 = group_by :: < 'tick , (_ ,) , (Option < _ > ,) > (| | (None ,) , | old : & mut (Option < _ > ,) , val : (_ ,) | { old . 0 = if let Some (prev) = old . 0 . take () { Some (prev + val . 0) } else { Some (val . 0) } ; });
11+
11v1 = map (| (g , a) | (g . 0 , a . 0 . unwrap () ,));
12+
13+
7v1 -> 2v1;
14+
11v1 -> 5v1;
15+
5v1 -> 8v1;
16+
10v1 -> 11v1;
17+
9v1 -> 10v1;
18+
2v1 -> 9v1;
19+

0 commit comments

Comments
 (0)