Open
Description
when going directly from a source_stream
to a for_each
For reasons that are not clear to me, v
fails to type-infer in the first example but the second example has no problem compiling.
#[multiplatform_test]
pub fn test_basic_4_fails() {
let send_recv =
hydroflow::util::unbounded_channel::<Result<(), ()>>();
let mut df = hydroflow_syntax! {
source_stream(send_recv.1)
-> for_each(|v| {
dbg!(v.unwrap());
});
};
}
#[multiplatform_test]
pub fn test_basic_4_compiles() {
let send_recv =
hydroflow::util::unbounded_channel::<Result<(), ()>>();
let mut df = hydroflow_syntax! {
source_stream(send_recv.1)
-> map(|v| v.unwrap())
-> for_each(|v| {
dbg!(v);
});
};
}
tee to a pivot (formerly #1149)
#[multiplatform_test]
pub fn test_fold_zip() {
let mut df = hydroflow::hydroflow_syntax! {
stream1 = source_iter(1..=10);
stream2 = source_iter_delta(3..=5) -> map(Max::new);
sum_of_stream2 = stream2 -> lattice_reduce() -> tee();
filtered_stream1 = stream1
-> [0]filtered_stream2;
sum_of_stream2 -> identity::<Max<_>>() -> [1]filtered_stream2;
filtered_stream2 = zip()
-> filter(|(value, sum_of_stream2)| {
// This is not monotonic.
value <= sum_of_stream2.as_reveal_ref()
})
-> map(|(x, _max)| (context.current_tick(), x))
-> assert_eq([(0, 1), (0, 2), (0, 3), (0, 4), (0, 5)]);
// Optional:
sum_of_stream2
-> map(|x| (context.current_tick(), x.into_reveal()))
-> assert_eq([(0, 3), (0, 4), (0, 5), (0, 6), (0, 7)]);
};
assert_graphvis_snapshots!(df);
df.run_available(); // Should return quickly and not hang
}
A few initial attempts to rearrange codegen to fix the type inference proved fruitless
(This example can work by making the final map |x: Max<_>| ...
)