Skip to content

Commit e556ba0

Browse files
valldracp1gp1g
andcommitted
Fix network state detection during VPN connectivity changes
Co-authored-by: S1m <31284753+p1gp1g@users.noreply.github.com>
1 parent 5cd3b76 commit e556ba0

File tree

3 files changed

+165
-142
lines changed

3 files changed

+165
-142
lines changed

app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,6 @@ private void onStartUnlock() {
290290
checkFreeDiskSpace();
291291
MemoryTracker.start();
292292
BackupSubscriptionCheckJob.enqueueIfAble();
293-
AppDependencies.getAuthWebSocket().registerKeepAliveToken(SignalWebSocket.FOREGROUND_KEEPALIVE);
294-
AppDependencies.getUnauthWebSocket().registerKeepAliveToken(SignalWebSocket.FOREGROUND_KEEPALIVE);
295293

296294
long lastForegroundTime = SignalStore.misc().getLastForegroundTime();
297295
long currentTime = System.currentTimeMillis();

app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,16 @@ class IncomingMessageObserver(
8383
private val decryptionDrainedListeners: MutableList<Runnable> = CopyOnWriteArrayList()
8484

8585
@Volatile
86-
private var networkIsActive = true
86+
private var networkIsActive: Boolean? = null
8787

8888
private val connectionDecisionSemaphore = Semaphore(0)
8989
private val networkConnectionListener = NetworkConnectionListener(
9090
connectivityManager = ServiceUtil.getConnectivityManager(context),
91-
onNetworkChange = { state ->
92-
// MOLLY: Accessing libsignalNetwork applies proxy configuration on access
93-
AppDependencies.libsignalNetwork.onNetworkChange()
94-
if (state.isReady) {
91+
onReachabilityChanged = { connectivityState ->
92+
if (networkIsActive != null) {
93+
AppDependencies.libsignalNetwork.onNetworkChange()
94+
}
95+
if (connectivityState.hasInternet) {
9596
networkIsActive = true
9697
} else {
9798
Log.w(TAG, "Lost network connection. Resetting the drained state.")
@@ -103,6 +104,7 @@ class IncomingMessageObserver(
103104
}
104105
releaseConnectionDecisionSemaphore()
105106
},
107+
// MOLLY: Accessing libsignalNetwork applies proxy configuration on access
106108
)
107109

108110
private val messageContentProcessor = MessageContentProcessor(context)
@@ -200,7 +202,7 @@ class IncomingMessageObserver(
200202

201203
val registered = SignalStore.account.isRegistered
202204
val pushAvailable = SignalStore.account.pushAvailable
203-
val hasNetwork = networkIsActive
205+
val hasNetwork = networkIsActive ?: false
204206
val hasProxy = AppDependencies.networkManager.isProxyEnabled
205207
val forceWebsocket = SignalStore.internal.isWebsocketModeForced
206208
val websocketAlreadyOpen = isConnectionAvailable()
@@ -217,7 +219,7 @@ class IncomingMessageObserver(
217219
}
218220

219221
private fun isConnectionAvailable(): Boolean {
220-
return SignalStore.account.isRegistered && (authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED || (authWebSocket.shouldSendKeepAlives() && networkIsActive))
222+
return SignalStore.account.isRegistered && (authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED || (authWebSocket.shouldSendKeepAlives() && networkIsActive ?: true))
221223
}
222224

223225
private fun releaseConnectionDecisionSemaphore() {
Lines changed: 156 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 Signal Messenger, LLC
2+
* Copyright 2026 Molly Instant Messenger
33
* SPDX-License-Identifier: AGPL-3.0-only
44
*/
55

@@ -10,176 +10,199 @@ import android.net.LinkProperties
1010
import android.net.Network
1111
import android.net.NetworkCapabilities
1212
import android.net.NetworkRequest
13-
import android.net.ProxyInfo
13+
import android.os.Build
14+
import kotlinx.coroutines.CoroutineScope
15+
import kotlinx.coroutines.Job
16+
import kotlinx.coroutines.channels.awaitClose
17+
import kotlinx.coroutines.channels.trySendBlocking
18+
import kotlinx.coroutines.flow.Flow
19+
import kotlinx.coroutines.flow.callbackFlow
20+
import kotlinx.coroutines.flow.conflate
21+
import kotlinx.coroutines.flow.distinctUntilChanged
22+
import kotlinx.coroutines.flow.retryWhen
23+
import kotlinx.coroutines.launch
24+
import org.signal.core.util.concurrent.SignalDispatchers
1425
import org.signal.core.util.logging.Log
15-
16-
data class NetworkState(
17-
val available: Boolean,
18-
val validated: Boolean,
19-
val httpProxy: ProxyInfo?,
20-
) {
21-
val isReady: Boolean
22-
get() = available && validated
23-
24-
companion object {
25-
val DOWN = NetworkState(available = false, validated = false, httpProxy = null)
26-
}
27-
28-
override fun toString(): String {
29-
return buildString {
30-
append(
31-
when {
32-
available && validated -> "UP"
33-
available && !validated -> "BLOCKED"
34-
else -> "DOWN"
35-
}
36-
)
37-
append('/')
38-
append(if (httpProxy != null) "PROXY" else "NO-PROXY")
39-
}
40-
}
26+
import org.signal.core.util.zipWithPrevious
27+
28+
enum class ConnectivityState {
29+
OFFLINE,
30+
ONLINE,
31+
ONLINE_VPN,
32+
BLOCKED,
33+
BLOCKED_VPN;
34+
35+
/** Returns true if network traffic expected to reach the Internet. */
36+
val hasInternet: Boolean
37+
get() = this == ONLINE || this == ONLINE_VPN
4138
}
4239

4340
/**
44-
* Observes changes in network connectivity and notifies via [onNetworkChange].
41+
* Observes changes in network connectivity and notifies via [onReachabilityChanged].
4542
*
46-
* The current connection state is also provided immediately upon registration.
43+
* The current connectivity state is provided immediately upon registration
44+
* if the device already has network access.
4745
*/
4846
class NetworkConnectionListener(
49-
val connectivityManager: ConnectivityManager,
50-
private val onNetworkChange: (NetworkState) -> Unit,
47+
private val connectivityManager: ConnectivityManager,
48+
private val onReachabilityChanged: (ConnectivityState) -> Unit,
5149
) {
5250
companion object {
5351
private val TAG = Log.tag(NetworkConnectionListener::class.java)
5452
}
5553

56-
inner class NetworkStateCallback : ConnectivityManager.NetworkCallback() {
57-
58-
// Tracks active networks and their states
59-
private val networks = mutableMapOf<Network, NetworkState>()
60-
61-
// Last dispatched connection state (null until first set)
62-
private var connectionState: NetworkState? = null
63-
64-
fun startMonitoring() {
65-
val request = NetworkRequest.Builder()
66-
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
67-
.build()
68-
69-
connectivityManager.registerNetworkCallback(request, this)
70-
71-
val network = connectivityManager.activeNetwork
72-
val caps = connectivityManager.getNetworkCapabilities(network)
73-
val props = connectivityManager.getLinkProperties(network)
74-
val hasInternet = caps?.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) ?: false
75-
val validated = caps?.hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED) ?: false
76-
val httpProxy = props?.httpProxy
77-
78-
val activeNetworkState = NetworkState(
79-
available = hasInternet,
80-
validated = validated,
81-
httpProxy = httpProxy,
82-
)
54+
private data class NetworkState(
55+
val validated: Boolean,
56+
val blocked: Boolean,
57+
val onVpn: Boolean
58+
) {
59+
val isReachable: Boolean get() = validated && !blocked
8360

84-
Log.d(TAG, "Active network snapshot: $activeNetworkState (network=$network)")
85-
onInitialConnectionState(network, activeNetworkState)
86-
}
87-
88-
fun stopMonitoring() {
89-
connectivityManager.unregisterNetworkCallback(this)
61+
companion object {
62+
val DOWN = NetworkState(validated = false, blocked = false, onVpn = false)
9063
}
64+
}
9165

92-
private fun updateConnectionState(): NetworkState? {
93-
val newState = networks.bestNetworkState()
94-
val oldState = connectionState
66+
private class NetworkAggregationCallback(
67+
private val onNetworkStateChanged: (NetworkState) -> Unit,
68+
private val onVpnLoss: () -> Unit,
69+
) : ConnectivityManager.NetworkCallback() {
9570

96-
return if (newState != oldState) {
97-
connectionState = newState
98-
if (oldState == null) {
99-
Log.i(TAG, "Network state initialized -> $newState")
100-
} else {
101-
Log.i(TAG, "Network state changed: $oldState -> $newState")
102-
}
103-
newState
104-
} else null
105-
}
71+
// Tracks active networks and their states
72+
private val networks = mutableMapOf<Network, NetworkState>()
10673

107-
private fun Map<Network, NetworkState>.bestNetworkState(): NetworkState =
108-
values.firstOrNull { it.validated }
109-
?: values.firstOrNull { it.available }
110-
?: NetworkState.DOWN
111-
112-
private fun onInitialConnectionState(network: Network?, state: NetworkState) {
113-
synchronized(this) {
114-
if (connectionState == null) {
115-
if (network != null) {
116-
networks[network] = state
117-
}
118-
updateConnectionState()
119-
} else {
120-
Log.v(TAG, "Initial state skipped; already set: $connectionState")
121-
null
122-
}
123-
}?.notifyChange()
74+
override fun onCapabilitiesChanged(network: Network, capabilities: NetworkCapabilities) {
75+
val validated = capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED)
76+
val vpn = capabilities.hasTransport(NetworkCapabilities.TRANSPORT_VPN)
77+
Log.d(TAG, "onCapabilitiesChanged($network, validated=$validated, vpn=$vpn)")
78+
val existing = networks[network]
79+
if (existing == null) {
80+
networks[network] = NetworkState(validated = validated, blocked = false, onVpn = vpn)
81+
// API 26+ guarantees that onLinkPropertiesChanged is always called next for new networks,
82+
// followed by onBlockedStatusChanged (on API 29+).
83+
} else {
84+
networks[network] = existing.copy(validated = validated, onVpn = vpn)
85+
onNetworkStateChanged(networks.bestNetworkState())
86+
}
12487
}
12588

12689
override fun onLinkPropertiesChanged(network: Network, linkProperties: LinkProperties) {
127-
Log.d(TAG, "NetworkCallback onLinkPropertiesChanged($network)")
128-
val httpProxy = linkProperties.httpProxy
129-
synchronized(this) {
130-
val existing = networks.getOrDefault(
131-
network,
132-
NetworkState(available = true, validated = false, httpProxy = httpProxy)
133-
)
134-
val state = existing.copy(httpProxy = httpProxy)
135-
networks[network] = state
136-
updateConnectionState()
137-
}?.notifyChange()
90+
if (Build.VERSION.SDK_INT < 29) {
91+
onNetworkStateChanged(networks.bestNetworkState())
92+
}
13893
}
13994

14095
override fun onBlockedStatusChanged(network: Network, blocked: Boolean) {
141-
Log.d(TAG, "NetworkCallback onBlockedStatusChanged($network, $blocked)")
142-
val validated = !blocked
143-
synchronized(this) {
144-
val existing = networks.getOrDefault(
145-
network,
146-
NetworkState(available = true, validated = validated, httpProxy = null)
147-
)
148-
val state = existing.copy(validated = validated)
149-
networks[network] = state
150-
updateConnectionState()
151-
}?.notifyChange()
96+
Log.d(TAG, "onBlockedStatusChanged($network, blocked=$blocked)")
97+
val existing = networks[network] ?: return
98+
networks[network] = existing.copy(blocked = blocked)
99+
onNetworkStateChanged(networks.bestNetworkState())
152100
}
153101

154102
override fun onLost(network: Network) {
155-
Log.d(TAG, "NetworkCallback onLost($network)")
156-
synchronized(this) {
157-
networks.remove(network)
158-
updateConnectionState()
159-
}?.notifyChange()
103+
Log.d(TAG, "onLost($network)")
104+
val removed = networks.remove(network)
105+
if (removed?.onVpn == true) {
106+
onVpnLoss()
107+
} else {
108+
onNetworkStateChanged(networks.bestNetworkState())
109+
}
160110
}
161111

162-
private fun NetworkState.notifyChange() {
163-
onNetworkChange(this)
112+
private fun Map<Network, NetworkState>.bestNetworkState(): NetworkState {
113+
return if (isEmpty()) {
114+
NetworkState.DOWN
115+
} else {
116+
// A VPN network is only considered validated if there's also an underlying
117+
// non-VPN network.
118+
val hasUnderlyingNet = values.any { !it.onVpn }
119+
val eligibleStates = values.map { state ->
120+
state.copy(
121+
validated = state.validated && hasUnderlyingNet
122+
)
123+
}
124+
eligibleStates.maxBy { it.rank() }
125+
}
164126
}
127+
128+
private fun NetworkState.rank(): Int =
129+
when {
130+
isReachable && onVpn -> 3
131+
isReachable -> 2
132+
blocked -> 1
133+
else -> 0
134+
}
165135
}
166136

167-
private var networkStateCallback: NetworkStateCallback? = null
137+
private fun internetConnectionFlow(): Flow<ConnectivityState> = callbackFlow {
138+
val callback = NetworkAggregationCallback(
139+
onNetworkStateChanged = {
140+
val connectivityState = when {
141+
it.isReachable && it.onVpn -> ConnectivityState.ONLINE_VPN
142+
it.isReachable -> ConnectivityState.ONLINE
143+
it.blocked && it.onVpn -> ConnectivityState.BLOCKED_VPN
144+
it.blocked -> ConnectivityState.BLOCKED
145+
else -> ConnectivityState.OFFLINE
146+
}
147+
// Should not block as we conflate the flow.
148+
trySendBlocking(connectivityState)
149+
},
150+
onVpnLoss = {
151+
// VPN transport disconnected. For always-on VPNs with a kill switch,
152+
// the underlying network may still appear "UP" but traffic is blocked.
153+
// Restart the flow to re-evaluate connectivity.
154+
close(NetworkStateStaleException("VPN loss"))
155+
}
156+
)
157+
158+
val request = NetworkRequest.Builder()
159+
.removeCapability(NetworkCapabilities.NET_CAPABILITY_NOT_VPN)
160+
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
161+
.build()
162+
163+
connectivityManager.registerNetworkCallback(request, callback)
164+
165+
awaitClose {
166+
connectivityManager.unregisterNetworkCallback(callback)
167+
}
168+
}.conflate()
169+
170+
private val scope = CoroutineScope(SignalDispatchers.IO)
171+
private var listenerJob: Job? = null
168172

169173
@Synchronized
170174
fun register() {
171-
if (networkStateCallback == null) {
172-
networkStateCallback = NetworkStateCallback().apply {
173-
startMonitoring()
174-
}
175-
} else {
176-
Log.w(TAG, "Already registered")
175+
if (listenerJob != null) return
176+
177+
listenerJob = scope.launch {
178+
internetConnectionFlow()
179+
.retryWhen { cause, _ ->
180+
val retrying = cause is NetworkStateStaleException
181+
Log.i(TAG, "Re-registering callback ($retrying): ${cause.message}")
182+
retrying
183+
}
184+
.distinctUntilChanged()
185+
.zipWithPrevious { prevState, state ->
186+
Log.i(TAG, buildString {
187+
append("Internet reachability: ")
188+
prevState?.let { append("$it -> ") }
189+
append(state)
190+
})
191+
state
192+
}.collect {
193+
onReachabilityChanged(it)
194+
}
177195
}
178196
}
179197

180198
@Synchronized
181199
fun unregister() {
182-
networkStateCallback?.stopMonitoring()
183-
networkStateCallback = null
200+
listenerJob?.cancel()
201+
listenerJob = null
184202
}
203+
204+
/**
205+
* Thrown when the tracked network state is no longer reliable.
206+
*/
207+
class NetworkStateStaleException(message: String) : Exception(message)
185208
}

0 commit comments

Comments
 (0)