Skip to content

Commit ecbbcc4

Browse files
committed
udp impl of handshake
1 parent 81bc783 commit ecbbcc4

File tree

5 files changed

+165
-57
lines changed

5 files changed

+165
-57
lines changed

Android/app/src/main/java/io/github/teamclouday/AndroidMic/domain/streaming/MicStreamManager.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class MicStreamManager(
3030
}
3131

3232
Mode.UDP -> {
33-
UdpStreamer(scope, ip!!, port!!)
33+
UdpStreamer(scope, ip!!, port)
3434
}
3535
}
3636

Android/app/src/main/java/io/github/teamclouday/AndroidMic/domain/streaming/UdpStreamer.kt

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import java.net.DatagramPacket
1414
import java.net.DatagramSocket
1515
import java.net.InetAddress
1616

17-
class UdpStreamer(private val scope: CoroutineScope, val ip: String, val port: Int) : Streamer {
17+
class UdpStreamer(private val scope: CoroutineScope, val ip: String, var port: Int?) : Streamer {
1818

1919
private val TAG: String = "UDP streamer"
2020

@@ -24,7 +24,27 @@ class UdpStreamer(private val scope: CoroutineScope, val ip: String, val port: I
2424
private var sequenceIdx = 0
2525

2626
override fun connect(): Boolean {
27-
return true
27+
28+
val p = port
29+
if (p != null) {
30+
31+
if (!handShake(p, 1500)) {
32+
Log.d(TAG, "connect [Socket]: handshake error")
33+
socket.close()
34+
return false
35+
}
36+
return true
37+
} else {
38+
for (p in DEFAULT_PORT..MAX_PORT) {
39+
if (!handShake(p, 100)) {
40+
continue
41+
}
42+
this.port = p
43+
return true
44+
}
45+
}
46+
47+
return false
2848
}
2949

3050
override fun disconnect(): Boolean {
@@ -64,7 +84,7 @@ class UdpStreamer(private val scope: CoroutineScope, val ip: String, val port: I
6484
0,
6585
combined.size,
6686
address,
67-
port
87+
port!!
6888
)
6989

7090
socket.send(packet)
@@ -82,4 +102,34 @@ class UdpStreamer(private val scope: CoroutineScope, val ip: String, val port: I
82102
override fun isAlive(): Boolean {
83103
return true
84104
}
105+
106+
fun handShake(
107+
p: Int,
108+
timeout: Int
109+
): Boolean {
110+
111+
return try {
112+
113+
val array = CHECK_1.toByteArray()
114+
val packet = DatagramPacket(
115+
array,
116+
0,
117+
array.size,
118+
address,
119+
p
120+
)
121+
122+
socket.send(packet)
123+
124+
val buff = ByteArray(CHECK_2.length)
125+
val recvPacket = DatagramPacket(buff, buff.size)
126+
127+
socket.soTimeout = timeout
128+
socket.receive(recvPacket)
129+
130+
recvPacket.data.contentEquals(CHECK_2.toByteArray())
131+
} catch (_: Exception) {
132+
false
133+
}
134+
}
85135
}

RustApp/src/streamer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub use streamer_runner::{ConnectOption, StreamerCommand, StreamerMsg, sub};
3030
use crate::{audio::AudioProcessParams, config::AudioFormat};
3131

3232
/// Default port on the PC
33-
const DEFAULT_PC_PORT: u16 = 55666;
33+
const DEFAULT_PC_PORT: u16 = 55555;
3434
const MAX_PORT: u16 = 60000;
3535

3636
const CHECK_1: &str = "AndroidMic1";

RustApp/src/streamer/udp_streamer.rs

Lines changed: 100 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio_util::{codec::LengthDelimitedCodec, udp::UdpFramed};
77

88
use crate::{
99
config::ConnectionMode,
10-
streamer::{AudioPacketMessage, DEFAULT_PC_PORT, MAX_PORT, WriteError},
10+
streamer::{AudioPacketMessage, CHECK_1, CHECK_2, DEFAULT_PC_PORT, MAX_PORT, WriteError},
1111
};
1212

1313
use super::{AudioPacketMessageOrdered, AudioStream, ConnectError, StreamerMsg, StreamerTrait};
@@ -20,8 +20,13 @@ pub struct UdpStreamer {
2020
ip: IpAddr,
2121
pub port: u16,
2222
stream_config: AudioStream,
23+
state: UdpStreamerState,
2324
framed: UdpFramed<LengthDelimitedCodec>,
24-
tracked_sequence: u32,
25+
}
26+
27+
enum UdpStreamerState {
28+
Listening,
29+
Streaming { tracked_sequence: u32 },
2530
}
2631

2732
pub async fn new(ip: IpAddr, stream_config: AudioStream) -> Result<UdpStreamer, ConnectError> {
@@ -49,8 +54,8 @@ pub async fn new(ip: IpAddr, stream_config: AudioStream) -> Result<UdpStreamer,
4954
ip,
5055
port: addr.port(),
5156
stream_config,
57+
state: UdpStreamerState::Listening,
5258
framed: UdpFramed::new(socket, LengthDelimitedCodec::new()),
53-
tracked_sequence: 0,
5459
};
5560

5661
Ok(streamer)
@@ -62,55 +67,110 @@ impl StreamerTrait for UdpStreamer {
6267
}
6368

6469
fn status(&self) -> StreamerMsg {
65-
StreamerMsg::Connected {
66-
ip: Some(self.ip),
67-
port: Some(self.port),
68-
mode: ConnectionMode::Udp,
70+
match self.state {
71+
UdpStreamerState::Listening => StreamerMsg::Listening {
72+
ip: Some(self.ip),
73+
port: Some(self.port),
74+
},
75+
UdpStreamerState::Streaming { .. } => StreamerMsg::Connected {
76+
ip: Some(self.ip),
77+
port: Some(self.port),
78+
mode: ConnectionMode::Udp,
79+
},
6980
}
7081
}
7182

7283
async fn next(&mut self) -> Result<Option<StreamerMsg>, ConnectError> {
73-
match self.framed.next().await {
74-
Some(Ok((frame, addr))) => {
75-
match AudioPacketMessageOrdered::decode(frame) {
76-
Ok(packet) => {
77-
if packet.sequence_number < self.tracked_sequence {
78-
// drop packet
79-
info!(
80-
"dropped packet: old sequence number {} < {}",
81-
packet.sequence_number, self.tracked_sequence
82-
);
84+
match &mut self.state {
85+
UdpStreamerState::Listening => {
86+
let mut buf1 = [0u8; CHECK_1.len()];
87+
88+
match self.framed.get_ref().recv_from(&mut buf1).await {
89+
Ok((_, src_addr)) => {
90+
if buf1 != CHECK_1.as_bytes() {
91+
let s = String::from_utf8_lossy(&buf1);
92+
93+
return Err(ConnectError::HandShakeFailed2(format!(
94+
"{} != {}",
95+
CHECK_1, s
96+
)));
8397
}
84-
self.tracked_sequence = packet.sequence_number;
85-
86-
let packet = packet.audio_packet.unwrap();
87-
let buffer_size = packet.buffer.len();
88-
let sample_rate = packet.sample_rate;
89-
90-
match self.stream_config.process_audio_packet(packet) {
91-
Ok(Some(buffer)) => {
92-
debug!("From {:?}, received {} bytes", addr, buffer_size);
93-
Ok(Some(StreamerMsg::UpdateAudioWave {
94-
data: AudioPacketMessage::to_wave_data(&buffer, sample_rate),
95-
}))
96-
}
97-
_ => Ok(None),
98+
99+
// send back the same check bytes
100+
self.framed
101+
.get_ref()
102+
.send_to(CHECK_2.as_bytes(), &src_addr)
103+
.await
104+
.map_err(|e| ConnectError::HandShakeFailed("writing", e))?;
105+
}
106+
Err(e) => {
107+
// error 10040 is when the buffer is too small
108+
// probably because the app is already in a connected state,
109+
// by sending audio data
110+
if !matches!(e.raw_os_error(), Some(10040)) {
111+
return Err(ConnectError::HandShakeFailed("reading", e));
98112
}
99113
}
100-
Err(e) => Err(ConnectError::WriteError(WriteError::Deserializer(e))),
101114
}
115+
116+
self.state = UdpStreamerState::Streaming {
117+
tracked_sequence: 0,
118+
};
119+
120+
Ok(Some(StreamerMsg::Connected {
121+
ip: Some(self.ip),
122+
port: Some(self.port),
123+
mode: ConnectionMode::Udp,
124+
}))
102125
}
103126

104-
Some(Err(e)) => {
105-
match e.kind() {
106-
io::ErrorKind::TimedOut => Ok(None), // timeout use to check for input on stdin
107-
io::ErrorKind::WouldBlock => Ok(None), // trigger on Linux when there is no stream input
108-
_ => Err(WriteError::Io(e))?,
127+
UdpStreamerState::Streaming { tracked_sequence } => {
128+
match self.framed.next().await {
129+
Some(Ok((frame, addr))) => {
130+
match AudioPacketMessageOrdered::decode(frame) {
131+
Ok(packet) => {
132+
if packet.sequence_number < *tracked_sequence {
133+
// drop packet
134+
info!(
135+
"dropped packet: old sequence number {} < {}",
136+
packet.sequence_number, tracked_sequence
137+
);
138+
}
139+
*tracked_sequence = packet.sequence_number;
140+
141+
let packet = packet.audio_packet.unwrap();
142+
let buffer_size = packet.buffer.len();
143+
let sample_rate = packet.sample_rate;
144+
145+
match self.stream_config.process_audio_packet(packet) {
146+
Ok(Some(buffer)) => {
147+
debug!("From {:?}, received {} bytes", addr, buffer_size);
148+
Ok(Some(StreamerMsg::UpdateAudioWave {
149+
data: AudioPacketMessage::to_wave_data(
150+
&buffer,
151+
sample_rate,
152+
),
153+
}))
154+
}
155+
_ => Ok(None),
156+
}
157+
}
158+
Err(e) => Err(ConnectError::WriteError(WriteError::Deserializer(e))),
159+
}
160+
}
161+
162+
Some(Err(e)) => {
163+
match e.kind() {
164+
io::ErrorKind::TimedOut => Ok(None), // timeout use to check for input on stdin
165+
io::ErrorKind::WouldBlock => Ok(None), // trigger on Linux when there is no stream input
166+
_ => Err(WriteError::Io(e))?,
167+
}
168+
}
169+
None => {
170+
todo!()
171+
}
109172
}
110173
}
111-
None => {
112-
todo!()
113-
}
114174
}
115175
}
116176
}

RustApp/src/ui/app.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ impl Application for AppState {
478478
return self.add_log(format!("Listening on `{ip}:{port}`").as_str());
479479
}
480480
}
481-
StreamerMsg::Connected { ip, port, mode } => {
481+
StreamerMsg::Connected { ip, port, mode: _ } => {
482482
if let Some(system_tray) = self.system_tray.as_mut() {
483483
system_tray.update_menu_state(false, &fl!("state_connected"));
484484
}
@@ -490,17 +490,15 @@ impl Application for AppState {
490490
port.unwrap_or_default()
491491
);
492492

493-
if mode != ConnectionMode::Udp {
494-
// show notification when app is minimized
495-
let _ = Notification::new()
496-
.summary("AndroidMic")
497-
.body(format!("Connected on {address}").as_str())
498-
.auto_icon()
499-
.show()
500-
.map_err(|e| {
501-
error!("failed to show notification: {e}");
502-
});
503-
}
493+
// show notification when app is minimized
494+
let _ = Notification::new()
495+
.summary("AndroidMic")
496+
.body(format!("Connected on {address}").as_str())
497+
.auto_icon()
498+
.show()
499+
.map_err(|e| {
500+
error!("failed to show notification: {e}");
501+
});
504502
}
505503

506504
self.connection_state = ConnectionState::Connected;

0 commit comments

Comments
 (0)