11use crate :: uniffi:: { error:: Result , model:: packet:: Packet } ;
2+ use async_trait;
23use itertools:: Itertools as _;
3- use std:: {
4- clone:: Clone ,
5- collections:: HashMap ,
6- iter:: IntoIterator ,
7- sync:: { Arc , Mutex } ,
8- } ;
4+ use std:: { clone:: Clone , collections:: HashMap , iter:: IntoIterator , sync:: Arc } ;
5+ use tokio:: sync:: Mutex ;
96
7+ #[ async_trait:: async_trait]
108pub trait Operator {
11- fn next ( & self , packets : Vec < ( String , Packet ) > ) -> Result < Vec < Packet > > ;
9+ async fn next ( & self , stream_name : String , packet : Packet ) -> Result < Vec < Packet > > ;
1210}
1311
1412pub struct JoinOperator {
1513 parent_count : usize ,
16- received_streams : Arc < Mutex < HashMap < String , Vec < Packet > > > > ,
14+ received_packets : Arc < Mutex < HashMap < String , Vec < Packet > > > > ,
1715}
1816
1917impl JoinOperator {
2018 pub fn new ( parent_count : usize ) -> Self {
2119 Self {
2220 parent_count,
23- received_streams : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
21+ received_packets : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
2422 }
2523 }
2624}
2725
28- #[ expect ( clippy :: excessive_nesting , reason = "Nesting manageable." ) ]
26+ #[ async_trait :: async_trait ]
2927impl Operator for JoinOperator {
30- fn next ( & self , packets : Vec < ( String , Packet ) > ) -> Result < Vec < Packet > > {
31- let mut next_packets = vec ! [ ] ;
32- for ( stream, packet) in & packets {
33- let mut received_streams = self . received_streams . lock ( ) ?;
34- if self . parent_count - usize:: from ( !received_streams. contains_key ( stream) )
35- == received_streams. len ( )
28+ async fn next ( & self , stream_name : String , packet : Packet ) -> Result < Vec < Packet > > {
29+ let mut received_packets = self . received_packets . lock ( ) . await ;
30+ received_packets
31+ . entry ( stream_name. clone ( ) )
32+ . or_default ( )
33+ . push ( packet. clone ( ) ) ;
34+ Ok (
35+ if self . parent_count - usize:: from ( !received_packets. contains_key ( & stream_name) )
36+ == received_packets. len ( )
3637 {
37- let packets_to_multiplex = received_streams
38+ let packets_to_multiplex = received_packets
3839 . iter ( )
3940 . filter_map ( |( parent_stream, parent_packets) | {
40- ( parent_stream != stream ) . then_some ( parent_packets. clone ( ) )
41+ ( parent_stream != & stream_name ) . then_some ( parent_packets. clone ( ) )
4142 } )
42- . chain ( vec ! [ vec![ packet. clone( ) ] ] . into_iter ( ) ) ;
43- let current_packets = packets_to_multiplex. multi_cartesian_product ( ) . map (
44- |packet_combinations_to_merge| {
43+ . chain ( vec ! [ vec![ packet. clone( ) ] ] . into_iter ( ) )
44+ . collect :: < Vec < _ > > ( ) ;
45+ drop ( received_packets) ;
46+
47+ packets_to_multiplex
48+ . into_iter ( )
49+ . multi_cartesian_product ( )
50+ . map ( |packet_combinations_to_merge| {
4551 packet_combinations_to_merge
4652 . into_iter ( )
4753 . flat_map ( IntoIterator :: into_iter)
4854 . collect :: < HashMap < _ , _ > > ( )
49- } ,
50- ) ;
51- next_packets. extend ( current_packets) ;
52- }
53- received_streams
54- . entry ( stream. clone ( ) )
55- . or_default ( )
56- . push ( packet. clone ( ) ) ;
57- }
58- Ok ( next_packets)
55+ } )
56+ . collect ( )
57+ } else {
58+ vec ! [ ]
59+ } ,
60+ )
5961 }
6062}
6163
@@ -69,23 +71,21 @@ impl MapOperator {
6971 }
7072}
7173
74+ #[ async_trait:: async_trait]
7275impl Operator for MapOperator {
73- fn next ( & self , packets : Vec < ( String , Packet ) > ) -> Result < Vec < Packet > > {
74- Ok ( packets
75- . iter ( )
76- . map ( |( _, packet) | {
77- packet
78- . iter ( )
79- . map ( |( packet_key, path_set) | {
80- (
81- self . map
82- . get ( packet_key)
83- . map_or_else ( || packet_key. clone ( ) , Clone :: clone) ,
84- path_set. clone ( ) ,
85- )
86- } )
87- . collect ( )
88- } )
89- . collect ( ) )
76+ async fn next ( & self , _: String , packet : Packet ) -> Result < Vec < Packet > > {
77+ Ok ( vec ! [
78+ packet
79+ . iter( )
80+ . map( |( packet_key, path_set) | {
81+ (
82+ self . map
83+ . get( packet_key)
84+ . map_or_else( || packet_key. clone( ) , Clone :: clone) ,
85+ path_set. clone( ) ,
86+ )
87+ } )
88+ . collect( ) ,
89+ ] )
9090 }
9191}
0 commit comments