Skip to content

Commit b97f16d

Browse files
valldracp1gp1g
andcommitted
Fix network state detection during VPN connectivity changes
Co-authored-by: S1m <31284753+p1gp1g@users.noreply.github.com>
1 parent 5cd3b76 commit b97f16d

File tree

4 files changed

+220
-197
lines changed

4 files changed

+220
-197
lines changed

app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,6 @@ private void onStartUnlock() {
290290
checkFreeDiskSpace();
291291
MemoryTracker.start();
292292
BackupSubscriptionCheckJob.enqueueIfAble();
293-
AppDependencies.getAuthWebSocket().registerKeepAliveToken(SignalWebSocket.FOREGROUND_KEEPALIVE);
294-
AppDependencies.getUnauthWebSocket().registerKeepAliveToken(SignalWebSocket.FOREGROUND_KEEPALIVE);
295293

296294
long lastForegroundTime = SignalStore.misc().getLastForegroundTime();
297295
long currentTime = System.currentTimeMillis();

app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,16 @@ class IncomingMessageObserver(
8383
private val decryptionDrainedListeners: MutableList<Runnable> = CopyOnWriteArrayList()
8484

8585
@Volatile
86-
private var networkIsActive = true
86+
private var networkIsActive: Boolean? = null
8787

8888
private val connectionDecisionSemaphore = Semaphore(0)
89-
private val networkConnectionListener = NetworkConnectionListener(
89+
private val internetConnectivityMonitor = InternetConnectivityMonitor(
9090
connectivityManager = ServiceUtil.getConnectivityManager(context),
91-
onNetworkChange = { state ->
92-
// MOLLY: Accessing libsignalNetwork applies proxy configuration on access
93-
AppDependencies.libsignalNetwork.onNetworkChange()
94-
if (state.isReady) {
91+
onReachabilityChanged = { connectivityState ->
92+
if (networkIsActive != null) {
93+
AppDependencies.libsignalNetwork.onNetworkChange()
94+
}
95+
if (connectivityState.hasInternet) {
9596
networkIsActive = true
9697
} else {
9798
Log.w(TAG, "Lost network connection. Resetting the drained state.")
@@ -103,6 +104,7 @@ class IncomingMessageObserver(
103104
}
104105
releaseConnectionDecisionSemaphore()
105106
},
107+
// MOLLY: Accessing libsignalNetwork applies proxy configuration on access
106108
)
107109

108110
private val messageContentProcessor = MessageContentProcessor(context)
@@ -142,7 +144,7 @@ class IncomingMessageObserver(
142144
}
143145
})
144146

145-
networkConnectionListener.register()
147+
internetConnectivityMonitor.register()
146148

147149
webSocketStateDisposable = authWebSocket
148150
.state
@@ -200,7 +202,7 @@ class IncomingMessageObserver(
200202

201203
val registered = SignalStore.account.isRegistered
202204
val pushAvailable = SignalStore.account.pushAvailable
203-
val hasNetwork = networkIsActive
205+
val hasNetwork = networkIsActive ?: false
204206
val hasProxy = AppDependencies.networkManager.isProxyEnabled
205207
val forceWebsocket = SignalStore.internal.isWebsocketModeForced
206208
val websocketAlreadyOpen = isConnectionAvailable()
@@ -217,7 +219,7 @@ class IncomingMessageObserver(
217219
}
218220

219221
private fun isConnectionAvailable(): Boolean {
220-
return SignalStore.account.isRegistered && (authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED || (authWebSocket.shouldSendKeepAlives() && networkIsActive))
222+
return SignalStore.account.isRegistered && (authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED || (authWebSocket.shouldSendKeepAlives() && networkIsActive ?: true))
221223
}
222224

223225
private fun releaseConnectionDecisionSemaphore() {
@@ -235,7 +237,7 @@ class IncomingMessageObserver(
235237

236238
fun terminate() {
237239
Log.w(TAG, "Termination! ${this.hashCode()}", Throwable())
238-
networkConnectionListener.unregister()
240+
internetConnectivityMonitor.unregister()
239241
webSocketStateDisposable.dispose()
240242
terminated = true
241243
authWebSocket.disconnect()
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright 2026 Molly Instant Messenger
3+
* SPDX-License-Identifier: AGPL-3.0-only
4+
*/
5+
6+
package org.thoughtcrime.securesms.messages
7+
8+
import android.net.ConnectivityManager
9+
import android.net.LinkProperties
10+
import android.net.Network
11+
import android.net.NetworkCapabilities
12+
import android.net.NetworkRequest
13+
import android.os.Build
14+
import kotlinx.coroutines.CoroutineScope
15+
import kotlinx.coroutines.Job
16+
import kotlinx.coroutines.channels.awaitClose
17+
import kotlinx.coroutines.channels.trySendBlocking
18+
import kotlinx.coroutines.flow.Flow
19+
import kotlinx.coroutines.flow.callbackFlow
20+
import kotlinx.coroutines.flow.conflate
21+
import kotlinx.coroutines.flow.distinctUntilChanged
22+
import kotlinx.coroutines.flow.retryWhen
23+
import kotlinx.coroutines.launch
24+
import org.signal.core.util.concurrent.SignalDispatchers
25+
import org.signal.core.util.logging.Log
26+
import org.signal.core.util.zipWithPrevious
27+
28+
enum class ConnectivityState {
29+
OFFLINE,
30+
ONLINE,
31+
ONLINE_VPN,
32+
BLOCKED,
33+
BLOCKED_VPN;
34+
35+
/** Returns true if network traffic expected to reach the Internet. */
36+
val hasInternet: Boolean
37+
get() = this == ONLINE || this == ONLINE_VPN
38+
}
39+
40+
/**
41+
* Observes changes in network connectivity and notifies via [onReachabilityChanged].
42+
*
43+
* The current connectivity state is provided immediately upon registration
44+
* if the device already has network access.
45+
*/
46+
class InternetConnectivityMonitor(
47+
private val connectivityManager: ConnectivityManager,
48+
private val onReachabilityChanged: (ConnectivityState) -> Unit,
49+
) {
50+
companion object {
51+
private val TAG = Log.tag(InternetConnectivityMonitor::class.java)
52+
}
53+
54+
private data class NetworkState(
55+
val validated: Boolean,
56+
val blocked: Boolean,
57+
val onVpn: Boolean
58+
) {
59+
val isReachable: Boolean get() = validated && !blocked
60+
61+
companion object {
62+
val DOWN = NetworkState(validated = false, blocked = false, onVpn = false)
63+
}
64+
}
65+
66+
private class NetworkAggregationCallback(
67+
private val onNetworkStateChanged: (NetworkState) -> Unit,
68+
private val onVpnLoss: () -> Unit,
69+
) : ConnectivityManager.NetworkCallback() {
70+
71+
// Tracks active networks and their states
72+
private val networks = mutableMapOf<Network, NetworkState>()
73+
74+
override fun onCapabilitiesChanged(network: Network, capabilities: NetworkCapabilities) {
75+
val validated = capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED)
76+
val vpn = capabilities.hasTransport(NetworkCapabilities.TRANSPORT_VPN)
77+
Log.d(TAG, "onCapabilitiesChanged($network, validated=$validated, vpn=$vpn)")
78+
val existing = networks[network]
79+
if (existing == null) {
80+
networks[network] = NetworkState(validated = validated, blocked = false, onVpn = vpn)
81+
// API 26+ guarantees that onLinkPropertiesChanged is always called next for new networks,
82+
// followed by onBlockedStatusChanged (on API 29+).
83+
} else {
84+
networks[network] = existing.copy(validated = validated, onVpn = vpn)
85+
onNetworkStateChanged(networks.bestNetworkState())
86+
}
87+
}
88+
89+
override fun onLinkPropertiesChanged(network: Network, linkProperties: LinkProperties) {
90+
if (Build.VERSION.SDK_INT < 29) {
91+
onNetworkStateChanged(networks.bestNetworkState())
92+
}
93+
}
94+
95+
override fun onBlockedStatusChanged(network: Network, blocked: Boolean) {
96+
Log.d(TAG, "onBlockedStatusChanged($network, blocked=$blocked)")
97+
val existing = networks[network] ?: return
98+
networks[network] = existing.copy(blocked = blocked)
99+
onNetworkStateChanged(networks.bestNetworkState())
100+
}
101+
102+
override fun onLost(network: Network) {
103+
Log.d(TAG, "onLost($network)")
104+
val removed = networks.remove(network)
105+
if (removed?.onVpn == true) {
106+
onVpnLoss()
107+
} else {
108+
onNetworkStateChanged(networks.bestNetworkState())
109+
}
110+
}
111+
112+
private fun Map<Network, NetworkState>.bestNetworkState(): NetworkState {
113+
return if (isEmpty()) {
114+
NetworkState.DOWN
115+
} else {
116+
// A VPN network is only considered validated if there's also an underlying
117+
// non-VPN network.
118+
val hasUnderlyingNet = values.any { !it.onVpn }
119+
val eligibleStates = values.map { state ->
120+
state.copy(
121+
validated = state.validated && hasUnderlyingNet
122+
)
123+
}
124+
eligibleStates.maxBy { it.rank() }
125+
}
126+
}
127+
128+
private fun NetworkState.rank(): Int =
129+
when {
130+
isReachable && onVpn -> 3
131+
isReachable -> 2
132+
blocked -> 1
133+
else -> 0
134+
}
135+
}
136+
137+
private fun internetConnectionFlow(): Flow<ConnectivityState> = callbackFlow {
138+
val callback = NetworkAggregationCallback(
139+
onNetworkStateChanged = {
140+
val connectivityState = when {
141+
it.isReachable && it.onVpn -> ConnectivityState.ONLINE_VPN
142+
it.isReachable -> ConnectivityState.ONLINE
143+
it.blocked && it.onVpn -> ConnectivityState.BLOCKED_VPN
144+
it.blocked -> ConnectivityState.BLOCKED
145+
else -> ConnectivityState.OFFLINE
146+
}
147+
// Should not block as we conflate the flow.
148+
trySendBlocking(connectivityState)
149+
},
150+
onVpnLoss = {
151+
// VPN transport disconnected. For always-on VPNs with a kill switch,
152+
// the underlying network may still appear "UP" but traffic is blocked.
153+
// Restart the flow to re-evaluate connectivity.
154+
close(NetworkStateStaleException("VPN loss"))
155+
}
156+
)
157+
158+
val request = NetworkRequest.Builder()
159+
.removeCapability(NetworkCapabilities.NET_CAPABILITY_NOT_VPN)
160+
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
161+
.build()
162+
163+
connectivityManager.registerNetworkCallback(request, callback)
164+
165+
awaitClose {
166+
connectivityManager.unregisterNetworkCallback(callback)
167+
}
168+
}.conflate()
169+
170+
private val scope = CoroutineScope(SignalDispatchers.IO)
171+
private var listenerJob: Job? = null
172+
173+
@Synchronized
174+
fun register() {
175+
if (listenerJob != null) return
176+
177+
listenerJob = scope.launch {
178+
internetConnectionFlow()
179+
.retryWhen { cause, _ ->
180+
val retrying = cause is NetworkStateStaleException
181+
Log.i(TAG, "Re-registering callback ($retrying): ${cause.message}")
182+
retrying
183+
}
184+
.distinctUntilChanged()
185+
.zipWithPrevious { prevState, state ->
186+
Log.i(TAG, buildString {
187+
append("Internet reachability: ")
188+
prevState?.let { append("$it -> ") }
189+
append(state)
190+
})
191+
state
192+
}.collect {
193+
onReachabilityChanged(it)
194+
}
195+
}
196+
}
197+
198+
@Synchronized
199+
fun unregister() {
200+
listenerJob?.cancel()
201+
listenerJob = null
202+
}
203+
204+
/**
205+
* Thrown when the tracked network state is no longer reliable.
206+
*/
207+
class NetworkStateStaleException(message: String) : Exception(message)
208+
}

0 commit comments

Comments
 (0)