11#![ expect( missing_docs, clippy:: panic_in_result_fn, reason = "OK in tests." ) ]
22
33use orcapod:: {
4- core:: operator:: { JoinOperator , MapOperator , Operator as _ } ,
4+ core:: operator:: { JoinOperator , MapOperator , Operator } ,
55 uniffi:: {
66 error:: Result ,
77 model:: packet:: { Blob , BlobKind , Packet , PathSet , URI } ,
@@ -23,6 +23,17 @@ fn make_packet_key(key_name: String, filepath: String) -> (String, PathSet) {
2323 )
2424}
2525
26+ async fn next_batch (
27+ operator : impl Operator ,
28+ packets : Vec < ( String , Packet ) > ,
29+ ) -> Result < Vec < Packet > > {
30+ let mut next_packets = vec ! [ ] ;
31+ for ( stream_name, packet) in packets {
32+ next_packets. extend ( operator. next ( stream_name, packet) . await ?) ;
33+ }
34+ Ok ( next_packets)
35+ }
36+
2637#[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
2738async fn join_once ( ) -> Result < ( ) > {
2839 let operator = JoinOperator :: new ( 2 ) ;
@@ -55,7 +66,7 @@ async fn join_once() -> Result<()> {
5566 input_streams. extend ( right_stream) ;
5667
5768 assert_eq ! (
58- operator . next ( input_streams) . await ?,
69+ next_batch ( operator , input_streams) . await ?,
5970 vec![
6071 Packet :: from( [
6172 make_packet_key( "subject" . into( ) , "left/subject0.png" . into( ) ) ,
@@ -94,35 +105,35 @@ async fn join_spotty() -> Result<()> {
94105
95106 assert_eq ! (
96107 operator
97- . next( vec! [ (
108+ . next(
98109 "right" . into( ) ,
99- Packet :: from( [ make_packet_key( "style" . into( ) , "right/style0.t7" . into( ) , ) ] ) ,
100- ) ] )
110+ Packet :: from( [ make_packet_key( "style" . into( ) , "right/style0.t7" . into( ) ) ] )
111+ )
101112 . await ?,
102113 vec![ ] ,
103114 "Unexpected streams."
104115 ) ;
105116
106117 assert_eq ! (
107118 operator
108- . next( vec! [ (
119+ . next(
109120 "right" . into( ) ,
110- Packet :: from( [ make_packet_key( "style" . into( ) , "right/style1.t7" . into( ) , ) ] ) ,
111- ) ] )
121+ Packet :: from( [ make_packet_key( "style" . into( ) , "right/style1.t7" . into( ) ) ] )
122+ )
112123 . await ?,
113124 vec![ ] ,
114125 "Unexpected streams."
115126 ) ;
116127
117128 assert_eq ! (
118129 operator
119- . next( vec! [ (
130+ . next(
120131 "left" . into( ) ,
121132 Packet :: from( [ make_packet_key(
122133 "subject" . into( ) ,
123- "left/subject0.png" . into( ) ,
124- ) ] ) ,
125- ) ] )
134+ "left/subject0.png" . into( )
135+ ) ] )
136+ )
126137 . await ?,
127138 vec![
128139 Packet :: from( [
@@ -138,21 +149,21 @@ async fn join_spotty() -> Result<()> {
138149 ) ;
139150
140151 assert_eq ! (
141- operator
142- . next (
143- ( 1 ..3 )
144- . map( |i| {
145- (
146- "left" . into( ) ,
147- Packet :: from( [ make_packet_key(
148- "subject" . into( ) ,
149- format!( "left/subject{i}.png" ) ,
150- ) ] ) ,
151- )
152- } )
153- . collect:: <Vec <_>>( )
154- )
155- . await ?,
152+ next_batch (
153+ operator ,
154+ ( 1 ..3 )
155+ . map( |i| {
156+ (
157+ "left" . into( ) ,
158+ Packet :: from( [ make_packet_key(
159+ "subject" . into( ) ,
160+ format!( "left/subject{i}.png" ) ,
161+ ) ] ) ,
162+ )
163+ } )
164+ . collect:: <Vec <_>>( )
165+ )
166+ . await ?,
156167 vec![
157168 Packet :: from( [
158169 make_packet_key( "subject" . into( ) , "left/subject1.png" . into( ) ) ,
@@ -183,13 +194,13 @@ async fn map_once() -> Result<()> {
183194
184195 assert_eq ! (
185196 operator
186- . next( vec! [ (
197+ . next(
187198 "parent" . into( ) ,
188199 Packet :: from( [
189200 make_packet_key( "key_old" . into( ) , "some/key.txt" . into( ) ) ,
190201 make_packet_key( "subject" . into( ) , "some/subject.txt" . into( ) ) ,
191202 ] ) ,
192- ) ] )
203+ )
193204 . await ?,
194205 vec![ Packet :: from( [
195206 make_packet_key( "key_new" . into( ) , "some/key.txt" . into( ) ) ,
0 commit comments