Skip to content

Commit 81bc783

Browse files
committed
add handshake
1 parent 38ee02c commit 81bc783

File tree

10 files changed

+126
-40
lines changed

10 files changed

+126
-40
lines changed

Android/app/src/main/java/io/github/teamclouday/AndroidMic/Preferences.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class AppPreferences(
2323
val mode = enumPreference("mode", Mode.WIFI)
2424

2525
val ip = stringPreference("ip", "192.168.")
26-
val port = stringPreference("port", "55555")
26+
val port = stringPreference("port", "")
2727

2828

2929
val theme = enumPreference("theme", Themes.System)

Android/app/src/main/java/io/github/teamclouday/AndroidMic/domain/service/Command.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ data class CommandData(
5959
command = Command.entries[msg.what],
6060
mode = msg.data.getOrdinal(ID_MODE)?.let { Mode.entries[it] },
6161
ip = msg.data.getString(ID_IP),
62-
port = msg.data.getInt(ID_PORT),
62+
port = msg.data.getInt(ID_PORT).let { if (it == -1) null else it },
6363
sampleRate = msg.data.getOrdinal(ID_SAMPLE_RATE)?.let { SampleRates.entries[it] },
6464
channelCount = msg.data.getOrdinal(ID_CHANNEL_COUNT)
6565
?.let { ChannelCount.entries[it] },
@@ -75,7 +75,7 @@ data class CommandData(
7575
this.mode?.let { r.putInt(ID_MODE, it.ordinal) }
7676

7777
this.ip?.let { r.putString(ID_IP, it) }
78-
this.port?.let { r.putInt(ID_PORT, it) }
78+
r.putInt(ID_PORT, this.port ?: -1)
7979

8080
this.sampleRate?.let { r.putInt(ID_SAMPLE_RATE, it.ordinal) }
8181
this.channelCount?.let { r.putInt(ID_CHANNEL_COUNT, it.ordinal) }

Android/app/src/main/java/io/github/teamclouday/AndroidMic/domain/streaming/MicStreamManager.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class MicStreamManager(
1818

1919
private var streamer: Streamer = when (mode) {
2020
Mode.WIFI -> {
21-
TcpStreamer.wifi(ctx, scope, ip!!, port!!)
21+
TcpStreamer.wifi(ctx, scope, ip!!, port)
2222
}
2323

2424
Mode.ADB -> {

Android/app/src/main/java/io/github/teamclouday/AndroidMic/domain/streaming/Streamer.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ import android.os.Messenger
44
import io.github.teamclouday.AndroidMic.domain.service.AudioPacket
55
import kotlinx.coroutines.flow.Flow
66

7+
const val CHECK_1 = "AndroidMic1"
8+
const val CHECK_2 = "AndroidMic2"
9+
10+
const val DEFAULT_PORT: Int = 55555;
11+
const val MAX_PORT: Int = 60000;
12+
713
interface Streamer {
814
fun connect(): Boolean
915
fun disconnect(): Boolean

Android/app/src/main/java/io/github/teamclouday/AndroidMic/domain/streaming/TcpStreamer.kt

Lines changed: 66 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,20 @@ import java.net.SocketTimeoutException
2323
class TcpStreamer(
2424
private val scope: CoroutineScope,
2525
private val tag: String,
26-
ip: String,
27-
port: Int
26+
private val ip: String,
27+
private var port: Int?
2828
) : Streamer {
2929

30-
3130
private var socket: Socket? = null
32-
private var address: String
33-
private val port: Int
3431
private var streamJob: Job? = null
3532

3633
companion object {
3734

38-
private const val MAX_WAIT_TIME = 1500 // timeout
39-
4035
fun wifi(
4136
ctx: Context,
4237
scope: CoroutineScope,
4338
ip: String,
44-
port: Int
39+
port: Int?
4540
): TcpStreamer {
4641

4742
// check WIFI
@@ -73,28 +68,36 @@ class TcpStreamer(
7368
)
7469
}
7570

76-
init {
77-
val inetSocketAddress = InetSocketAddress(ip, port)
78-
this.address = inetSocketAddress.hostName
79-
this.port = inetSocketAddress.port
80-
}
81-
8271
// connect to server
8372
override fun connect(): Boolean {
84-
// create socket
85-
socket = Socket()
86-
try {
87-
socket?.connect(InetSocketAddress(address, port), MAX_WAIT_TIME)
88-
} catch (e: IOException) {
89-
Log.d(tag, "connect [Socket]: ${e.message}")
90-
null
91-
} catch (e: SocketTimeoutException) {
92-
Log.d(tag, "connect [Socket]: ${e.message}")
93-
null
94-
} ?: return false
95-
socket?.soTimeout = MAX_WAIT_TIME
9673

97-
return true
74+
val p = port
75+
if (p != null) {
76+
val socket = createSocket(p, 1500) ?: return false
77+
78+
if (!handShake(socket)) {
79+
Log.d(tag, "connect [Socket]: handshake error")
80+
socket.close()
81+
return false
82+
}
83+
this.socket = socket
84+
return true
85+
} else {
86+
for (p in DEFAULT_PORT..MAX_PORT) {
87+
val socket = createSocket(p, 100) ?: continue
88+
socket.soTimeout = 1500
89+
if (!handShake(socket)) {
90+
socket.close()
91+
continue
92+
}
93+
94+
this.socket = socket
95+
this.port = p
96+
return true
97+
}
98+
}
99+
100+
return false
98101
}
99102

100103
// stream data through socket
@@ -115,7 +118,7 @@ class TcpStreamer(
115118
val pack = message.toByteArray()
116119

117120
socket!!.outputStream.write(pack.size.toBigEndianU32())
118-
socket!!.outputStream.write(message.toByteArray())
121+
socket!!.outputStream.write(pack)
119122
socket!!.outputStream.flush()
120123
} catch (e: IOException) {
121124
Log.d(tag, "${e.message}")
@@ -149,7 +152,6 @@ class TcpStreamer(
149152
// shutdown streamer
150153
override fun shutdown() {
151154
disconnect()
152-
address = ""
153155
}
154156

155157
// get connected server information
@@ -162,4 +164,39 @@ class TcpStreamer(
162164
override fun isAlive(): Boolean {
163165
return (socket != null && socket?.isConnected == true)
164166
}
167+
168+
169+
fun createSocket(p: Int, timeout: Int): Socket? {
170+
val socket = Socket()
171+
return try {
172+
socket.connect(InetSocketAddress(ip, p), timeout)
173+
socket.soTimeout = timeout
174+
socket
175+
} catch (e: IOException) {
176+
Log.d(tag, "connect [Socket]: ${e.message}")
177+
null
178+
} catch (e: SocketTimeoutException) {
179+
Log.d(tag, "connect [Socket]: ${e.message}")
180+
null
181+
}
182+
}
183+
184+
fun handShake(
185+
socket: Socket,
186+
): Boolean {
187+
188+
return try {
189+
val out = socket.getOutputStream()
190+
out.write(CHECK_1.toByteArray())
191+
out.flush()
192+
193+
val input = socket.getInputStream()
194+
val msgBuf = ByteArray(CHECK_2.length)
195+
input.read(msgBuf)
196+
msgBuf.contentEquals(CHECK_2.toByteArray())
197+
198+
} catch (_: Exception) {
199+
false
200+
}
201+
}
165202
}

Android/app/src/main/java/io/github/teamclouday/AndroidMic/ui/MainViewModel.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,18 @@ class MainViewModel : ViewModel() {
116116

117117
when (mode) {
118118
Mode.WIFI, Mode.UDP -> {
119-
if (!checkIp(ip) || !checkPort(port)) {
119+
if (!checkIp(ip) || !checkPort(port, true)) {
120120
uiHelper.makeToast(
121121
uiHelper.getString(R.string.invalid_ip_port)
122122
)
123123
return Dialogs.IpPort
124124
}
125125
data.ip = ip
126-
data.port = port.toInt()
126+
data.port = try {
127+
port.toInt()
128+
} catch (_: Exception) {
129+
null
130+
}
127131
}
128132

129133
else -> {}

Android/app/src/main/java/io/github/teamclouday/AndroidMic/utils/Utils.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ fun checkIp(ip: String): Boolean {
2121
}
2222
}
2323

24-
fun checkPort(portStr: String): Boolean {
24+
fun checkPort(portStr: String, emptyAllowed: Boolean): Boolean {
25+
26+
if (portStr.isEmpty()) {
27+
return emptyAllowed
28+
}
29+
2530
val port = try {
2631
portStr.toInt()
2732
} catch (e: NumberFormatException) {

RustApp/build.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ fn main() -> io::Result<()> {
2020
set_env("ANDROID_MIC_FORMAT");
2121
set_env("ANDROID_MIC_COMMIT");
2222

23+
println!("cargo:rerun-if-changed=src/proto/message.proto");
24+
2325
// build protobuf
2426
prost_build::Config::new()
2527
.out_dir("src/streamer")

RustApp/src/streamer/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ pub use streamer_runner::{ConnectOption, StreamerCommand, StreamerMsg, sub};
3030
use crate::{audio::AudioProcessParams, config::AudioFormat};
3131

3232
/// Default port on the PC
33-
const DEFAULT_PC_PORT: u16 = 55555;
33+
const DEFAULT_PC_PORT: u16 = 55666;
3434
const MAX_PORT: u16 = 60000;
3535

36+
const CHECK_1: &str = "AndroidMic1";
37+
const CHECK_2: &str = "AndroidMic2";
38+
3639
pub struct AudioStream {
3740
pub buff: Producer<u8>,
3841
pub audio_params: AudioProcessParams,
@@ -127,6 +130,10 @@ enum ConnectError {
127130
AdbStatusCommand { code: Option<i32>, stderr: String },
128131
#[error("command failed: {0} make sure adb is installed and in your PATH")]
129132
CommandFailed(io::Error),
133+
#[error("Handshake failed: {0} {1}")]
134+
HandShakeFailed(&'static str, io::Error),
135+
#[error("Handshake failed: {0}")]
136+
HandShakeFailed2(String),
130137
}
131138

132139
#[derive(Debug, Error)]

RustApp/src/streamer/tcp_streamer.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ use std::{io, net::IpAddr, time::Duration};
22

33
use futures::StreamExt;
44
use prost::Message;
5-
use tokio::net::{TcpListener, TcpStream};
5+
use tokio::{
6+
io::{AsyncReadExt, AsyncWriteExt},
7+
net::{TcpListener, TcpStream},
8+
};
69
use tokio_util::codec::{Framed, LengthDelimitedCodec};
710

811
use crate::{
912
config::ConnectionMode,
10-
streamer::{DEFAULT_PC_PORT, MAX_PORT, StreamerMsg, WriteError},
13+
streamer::{CHECK_1, CHECK_2, DEFAULT_PC_PORT, MAX_PORT, StreamerMsg, WriteError},
1114
};
1215

1316
use super::{AudioPacketMessage, AudioStream, ConnectError, StreamerTrait};
@@ -92,7 +95,29 @@ impl StreamerTrait for TcpStreamer {
9295

9396
info!("TCP server listening on {}", addr);
9497

95-
let (stream, addr) = listener.accept().await.map_err(ConnectError::CantAccept)?;
98+
let (mut stream, addr) =
99+
listener.accept().await.map_err(ConnectError::CantAccept)?;
100+
101+
let mut buf1 = [0u8; CHECK_1.len()];
102+
103+
stream
104+
.read_exact(&mut buf1)
105+
.await
106+
.map_err(|e| ConnectError::HandShakeFailed("reading", e))?;
107+
108+
if buf1 != CHECK_1.as_bytes() {
109+
let s = String::from_utf8_lossy(&buf1);
110+
111+
return Err(ConnectError::HandShakeFailed2(format!(
112+
"{} != {}",
113+
CHECK_1, s
114+
)));
115+
}
116+
117+
stream
118+
.write_all(CHECK_2.as_bytes())
119+
.await
120+
.map_err(|e| ConnectError::HandShakeFailed("writing", e))?;
96121

97122
info!("connection accepted, remote address: {}", addr);
98123

0 commit comments

Comments
 (0)