1- use std:: { io, net:: IpAddr , time:: Duration } ;
1+ use std:: { io, net:: IpAddr , time:: Duration , u64 } ;
22
33use futures:: StreamExt ;
44use prost:: Message ;
@@ -7,7 +7,7 @@ use tokio_util::{codec::LengthDelimitedCodec, udp::UdpFramed};
77
88use crate :: {
99 config:: ConnectionMode ,
10- streamer:: { AudioPacketMessage , CHECK_1 , CHECK_2 , DEFAULT_PC_PORT , MAX_PORT , WriteError } ,
10+ streamer:: { AudioPacketMessage , DEFAULT_PC_PORT , MAX_PORT , WriteError } ,
1111} ;
1212
1313use super :: { AudioPacketMessageOrdered , AudioStream , ConnectError , StreamerMsg , StreamerTrait } ;
@@ -20,13 +20,9 @@ pub struct UdpStreamer {
2020 ip : IpAddr ,
2121 pub port : u16 ,
2222 stream_config : AudioStream ,
23- state : UdpStreamerState ,
2423 framed : UdpFramed < LengthDelimitedCodec > ,
25- }
26-
27- enum UdpStreamerState {
28- Listening ,
29- Streaming { tracked_sequence : u32 } ,
24+ is_listening : bool ,
25+ tracked_sequence : u32 ,
3026}
3127
3228pub async fn new ( ip : IpAddr , stream_config : AudioStream ) -> Result < UdpStreamer , ConnectError > {
@@ -54,7 +50,8 @@ pub async fn new(ip: IpAddr, stream_config: AudioStream) -> Result<UdpStreamer,
5450 ip,
5551 port : addr. port ( ) ,
5652 stream_config,
57- state : UdpStreamerState :: Listening ,
53+ tracked_sequence : 0 ,
54+ is_listening : true ,
5855 framed : UdpFramed :: new ( socket, LengthDelimitedCodec :: new ( ) ) ,
5956 } ;
6057
@@ -67,124 +64,89 @@ impl StreamerTrait for UdpStreamer {
6764 }
6865
6966 fn status ( & self ) -> StreamerMsg {
70- match self . state {
71- UdpStreamerState :: Listening => StreamerMsg :: Listening {
67+ if self . is_listening {
68+ StreamerMsg :: Listening {
7269 ip : Some ( self . ip ) ,
7370 port : Some ( self . port ) ,
74- } ,
75- UdpStreamerState :: Streaming { .. } => StreamerMsg :: Connected {
71+ }
72+ } else {
73+ StreamerMsg :: Connected {
7674 ip : Some ( self . ip ) ,
7775 port : Some ( self . port ) ,
7876 mode : ConnectionMode :: Udp ,
79- } ,
77+ }
8078 }
8179 }
8280
8381 async fn next ( & mut self ) -> Result < Option < StreamerMsg > , ConnectError > {
84- match & mut self . state {
85- UdpStreamerState :: Listening => {
86- let mut buf1 = [ 0u8 ; CHECK_1 . len ( ) ] ;
87-
88- match self . framed . get_ref ( ) . recv_from ( & mut buf1) . await {
89- Ok ( ( _, src_addr) ) => {
90- if buf1 != CHECK_1 . as_bytes ( ) {
91- let s = String :: from_utf8_lossy ( & buf1) ;
92-
93- return Err ( ConnectError :: HandShakeFailed2 ( format ! (
94- "{} != {}" ,
95- CHECK_1 , s
96- ) ) ) ;
97- }
82+ match tokio:: time:: timeout (
83+ Duration :: from_secs ( if self . is_listening { u64:: MAX } else { 1 } ) ,
84+ self . framed . next ( ) ,
85+ )
86+ . await
87+ {
88+ Ok ( res) => match res {
89+ Some ( Ok ( ( frame, addr) ) ) => {
90+ match AudioPacketMessageOrdered :: decode ( frame) {
91+ Ok ( packet) => {
92+ if self . is_listening {
93+ self . is_listening = false ;
94+ return Ok ( Some ( StreamerMsg :: Connected {
95+ ip : Some ( self . ip ) ,
96+ port : Some ( self . port ) ,
97+ mode : ConnectionMode :: Udp ,
98+ } ) ) ;
99+ }
98100
99- // send back the same check bytes
100- self . framed
101- . get_ref ( )
102- . send_to ( CHECK_2 . as_bytes ( ) , & src_addr)
103- . await
104- . map_err ( |e| ConnectError :: HandShakeFailed ( "writing" , e) ) ?;
105- }
106- Err ( e) => {
107- // error 10040 is when the buffer is too small
108- // probably because the app is already in a connected state,
109- // by sending audio data
110- if !matches ! ( e. raw_os_error( ) , Some ( 10040 ) ) {
111- return Err ( ConnectError :: HandShakeFailed ( "reading" , e) ) ;
101+ if packet. sequence_number < self . tracked_sequence {
102+ // drop packet
103+ info ! (
104+ "dropped packet: old sequence number {} < {}" ,
105+ packet. sequence_number, self . tracked_sequence
106+ ) ;
107+ }
108+ self . tracked_sequence = packet. sequence_number ;
109+
110+ let packet = packet. audio_packet . unwrap ( ) ;
111+ let buffer_size = packet. buffer . len ( ) ;
112+ let sample_rate = packet. sample_rate ;
113+
114+ match self . stream_config . process_audio_packet ( packet) {
115+ Ok ( Some ( buffer) ) => {
116+ debug ! ( "From {:?}, received {} bytes" , addr, buffer_size) ;
117+ Ok ( Some ( StreamerMsg :: UpdateAudioWave {
118+ data : AudioPacketMessage :: to_wave_data (
119+ & buffer,
120+ sample_rate,
121+ ) ,
122+ } ) )
123+ }
124+ _ => Ok ( None ) ,
125+ }
112126 }
127+ Err ( e) => Err ( ConnectError :: WriteError ( WriteError :: Deserializer ( e) ) ) ,
113128 }
114129 }
115130
116- self . state = UdpStreamerState :: Streaming {
117- tracked_sequence : 0 ,
118- } ;
119-
120- Ok ( Some ( StreamerMsg :: Connected {
131+ Some ( Err ( e) ) => {
132+ match e. kind ( ) {
133+ io:: ErrorKind :: TimedOut => Ok ( None ) , // timeout use to check for input on stdin
134+ io:: ErrorKind :: WouldBlock => Ok ( None ) , // trigger on Linux when there is no stream input
135+ _ => Err ( WriteError :: Io ( e) ) ?,
136+ }
137+ }
138+ None => {
139+ todo ! ( )
140+ }
141+ } ,
142+ Err ( _) => {
143+ self . is_listening = true ;
144+ self . tracked_sequence = 0 ;
145+ Ok ( Some ( StreamerMsg :: Listening {
121146 ip : Some ( self . ip ) ,
122147 port : Some ( self . port ) ,
123- mode : ConnectionMode :: Udp ,
124148 } ) )
125149 }
126-
127- UdpStreamerState :: Streaming { tracked_sequence } => {
128- match tokio:: time:: timeout ( Duration :: from_secs ( 1 ) , self . framed . next ( ) ) . await {
129- Ok ( res) => match res {
130- Some ( Ok ( ( frame, addr) ) ) => {
131- match AudioPacketMessageOrdered :: decode ( frame) {
132- Ok ( packet) => {
133- if packet. sequence_number < * tracked_sequence {
134- // drop packet
135- info ! (
136- "dropped packet: old sequence number {} < {}" ,
137- packet. sequence_number, tracked_sequence
138- ) ;
139- }
140- * tracked_sequence = packet. sequence_number ;
141-
142- let packet = packet. audio_packet . unwrap ( ) ;
143- let buffer_size = packet. buffer . len ( ) ;
144- let sample_rate = packet. sample_rate ;
145-
146- match self . stream_config . process_audio_packet ( packet) {
147- Ok ( Some ( buffer) ) => {
148- debug ! (
149- "From {:?}, received {} bytes" ,
150- addr, buffer_size
151- ) ;
152- Ok ( Some ( StreamerMsg :: UpdateAudioWave {
153- data : AudioPacketMessage :: to_wave_data (
154- & buffer,
155- sample_rate,
156- ) ,
157- } ) )
158- }
159- _ => Ok ( None ) ,
160- }
161- }
162- Err ( e) => {
163- Err ( ConnectError :: WriteError ( WriteError :: Deserializer ( e) ) )
164- }
165- }
166- }
167-
168- Some ( Err ( e) ) => {
169- match e. kind ( ) {
170- io:: ErrorKind :: TimedOut => Ok ( None ) , // timeout use to check for input on stdin
171- io:: ErrorKind :: WouldBlock => Ok ( None ) , // trigger on Linux when there is no stream input
172- _ => Err ( WriteError :: Io ( e) ) ?,
173- }
174- }
175- None => {
176- todo ! ( )
177- }
178- } ,
179- Err ( _) => {
180- self . state = UdpStreamerState :: Listening ;
181- Ok ( Some ( StreamerMsg :: Listening {
182- ip : Some ( self . ip ) ,
183- port : Some ( self . port ) ,
184- } ) )
185- }
186- }
187- }
188150 }
189151 }
190152}
0 commit comments