Skip to content

Commit f4229ae

Browse files
committed
fix(sdk): being unable to add a new listener after dropping old ones
1 parent 8764088 commit f4229ae

File tree

7 files changed

+156
-28
lines changed

7 files changed

+156
-28
lines changed

filen-sdk-rs/src/auth/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,10 @@ impl Client {
402402
self.http_client.clone()
403403
}
404404

405+
pub(crate) fn arc_client_ref(&self) -> &Arc<AuthClient> {
406+
&self.http_client
407+
}
408+
405409
pub fn crypter(&self) -> Arc<impl MetaCrypter> {
406410
self.auth_info
407411
.read()

filen-sdk-rs/src/socket/native.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,19 @@ impl Client {
2727
event_types: Option<Vec<Cow<'static, str>>>,
2828
) -> Result<ListenerHandle, Error> {
2929
let request_sender = {
30-
let mut socket_handle = self.socket_handle.lock().unwrap();
31-
socket_handle.get_request_sender(self.arc_client())
30+
let mut socket_handle = self.socket_handle.lock().unwrap_or_else(|e| e.into_inner());
31+
socket_handle.get_request_sender(self.arc_client_ref())
3232
};
3333
request_sender
3434
.add_event_listener(callback, event_types)
3535
.await
3636
}
3737

3838
pub fn is_socket_connected(&self) -> bool {
39-
self.socket_handle.lock().unwrap().request_sender.is_some()
39+
self.socket_handle
40+
.lock()
41+
.unwrap_or_else(|e| e.into_inner())
42+
.is_connected()
4043
}
4144

4245
// we need to expose this for v3 because most of the returned events are encrypted
@@ -54,7 +57,7 @@ impl Client {
5457
#[derive(Default)]
5558
pub(crate) struct WebSocketHandle {
5659
// closes websocket thread on drop
57-
request_sender: Option<tokio::sync::mpsc::Sender<SocketRequest>>,
60+
request_sender: Option<tokio::sync::mpsc::WeakSender<SocketRequest>>,
5861
}
5962

6063
struct RequestSender(tokio::sync::mpsc::Sender<SocketRequest>);
@@ -93,15 +96,21 @@ impl RequestSender {
9396
}
9497

9598
impl WebSocketHandle {
96-
fn get_request_sender(&mut self, client: Arc<AuthClient>) -> RequestSender {
97-
RequestSender(match &self.request_sender {
98-
Some(s) => s.clone(),
99-
None => {
100-
let request_sender = spawn_websocket_thread(client);
101-
self.request_sender = Some(request_sender.clone());
102-
request_sender
103-
}
104-
})
99+
fn get_request_sender(&mut self, client: &Arc<AuthClient>) -> RequestSender {
100+
if let Some(weak_sender) = &self.request_sender
101+
&& let Some(sender) = weak_sender.upgrade()
102+
{
103+
return RequestSender(sender);
104+
}
105+
let sender = spawn_websocket_thread(Arc::clone(client));
106+
self.request_sender = Some(sender.downgrade());
107+
RequestSender(sender)
108+
}
109+
110+
fn is_connected(&self) -> bool {
111+
self.request_sender
112+
.as_ref()
113+
.is_some_and(|weak| weak.strong_count() > 0)
105114
}
106115
}
107116

@@ -319,7 +328,7 @@ fn handle_message(
319328
/// Handles a socket request, modifying the listener manager as needed.
320329
///
321330
/// Returns true if there are no more listeners after handling the request.
322-
fn handle_request(request: SocketRequest, listeners: &mut impl ListenerManagerExt) -> bool {
331+
fn handle_request(request: SocketRequest, listeners: &mut impl ListenerManagerExt) {
323332
match request {
324333
SocketRequest::AddListener {
325334
id_sender,
@@ -338,7 +347,6 @@ fn handle_request(request: SocketRequest, listeners: &mut impl ListenerManagerEx
338347
listeners.remove_listener(idx);
339348
}
340349
}
341-
listeners.is_empty()
342350
}
343351

344352
/// Handles the initialized websocket connection, processing incoming messages and managing listeners.
@@ -360,11 +368,7 @@ async fn handle_initialized_websocket(
360368
should_retry = false;
361369
break;
362370
};
363-
should_retry = !handle_request(request, listeners);
364-
if !should_retry {
365-
// no more listeners, shutting down websocket task
366-
break;
367-
}
371+
handle_request(request, listeners);
368372
}
369373
message_result = streams.read.next() => {
370374
if let Err((should_continue, error)) = handle_message(message_result, listeners) {

filen-sdk-rs/src/socket/shared.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ mod listener_manager {
122122
this_id
123123
}
124124

125-
fn remove_listener(&mut self, id: u64) {
126-
self.callbacks_mut().remove(&id);
125+
fn remove_listener(&mut self, id: u64) -> Option<EventListenerCallback> {
126+
let callback = self.callbacks_mut().remove(&id);
127127
let mut empty_event_types = Vec::new();
128128
for (event_type, callbacks) in self.callbacks_for_event_mut().iter_mut() {
129129
callbacks.retain(|&callback_id| callback_id != id);
@@ -136,6 +136,8 @@ mod listener_manager {
136136
}
137137
self.global_callbacks_mut()
138138
.retain(|&callback_id| callback_id != id);
139+
140+
callback
139141
}
140142
}
141143

@@ -285,7 +287,10 @@ mod listener_manager {
285287
}
286288

287289
fn remove_listener(&mut self, id: u64) {
288-
ListenerManagerExtInner::remove_listener(self, id);
290+
let callback = ListenerManagerExtInner::remove_listener(self, id);
291+
if let Some(callback) = callback {
292+
callback(&SocketEvent::Unsubscribed);
293+
}
289294
self.new_ids.retain(|new_id_struct| new_id_struct.id != id);
290295
}
291296
}
@@ -369,7 +374,10 @@ mod listener_manager {
369374
}
370375

371376
fn remove_listener(&mut self, id: u64) {
372-
ListenerManagerExtInner::remove_listener(self, id);
377+
let callback = ListenerManagerExtInner::remove_listener(self, id);
378+
if let Some(callback) = callback {
379+
callback(&SocketEvent::Unsubscribed);
380+
}
373381
}
374382
}
375383
}

filen-sdk-rs/src/socket/uniffi/events.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub enum SocketEvent {
2727
AuthFailed,
2828
/// Sent when the socket has unexpectedly closed and begins attempting to reconnect
2929
Reconnecting,
30+
/// Sent when the handle to the event listener has been dropped and the listener is removed
31+
Unsubscribed,
3032
NewEvent(NewEvent),
3133
FileRename(FileRename),
3234
FileArchiveRestored(FileArchiveRestored),
@@ -76,6 +78,7 @@ impl From<&BorrowedSocketEvent<'_>> for SocketEvent {
7678
BorrowedSocketEvent::AuthSuccess => Self::AuthSuccess,
7779
BorrowedSocketEvent::AuthFailed => Self::AuthFailed,
7880
BorrowedSocketEvent::Reconnecting => Self::Reconnecting,
81+
BorrowedSocketEvent::Unsubscribed => Self::Unsubscribed,
7982
BorrowedSocketEvent::NewEvent(e) => Self::NewEvent(e.into()),
8083
BorrowedSocketEvent::FileRename(e) => Self::FileRename(e.into()),
8184
BorrowedSocketEvent::FileArchiveRestored(e) => Self::FileArchiveRestored(e.into()),
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use std::{borrow::Cow, time::Duration};
2+
3+
use filen_macros::shared_test_runtime;
4+
use filen_types::{api::v3::socket::SocketEvent, traits::CowHelpers};
5+
// separate file because it needs to avoid interference with other tests
6+
7+
async fn await_event<F>(
8+
receiver: &mut tokio::sync::mpsc::UnboundedReceiver<SocketEvent<'static>>,
9+
mut filter: F,
10+
timeout: Duration,
11+
) -> Result<SocketEvent<'static>, Cow<'static, str>>
12+
where
13+
F: FnMut(&SocketEvent) -> bool,
14+
{
15+
let sleep_until = tokio::time::Instant::now() + timeout;
16+
loop {
17+
tokio::select! {
18+
_ = tokio::time::sleep_until(sleep_until) => {
19+
return Err("Timed out waiting for event".into());
20+
}
21+
event = receiver.recv() => {
22+
let event = event.ok_or(Cow::Borrowed("Expected to receive event"))?;
23+
if filter(&event) {
24+
return Ok(event);
25+
}
26+
}
27+
}
28+
}
29+
}
30+
31+
#[shared_test_runtime]
32+
async fn test_websocket_disconnect_reconnect() {
33+
let client = test_utils::RESOURCES.client().await;
34+
35+
let (events_sender, mut events_receiver) = tokio::sync::mpsc::unbounded_channel();
36+
37+
assert!(!client.is_socket_connected());
38+
39+
let handle = client
40+
.add_event_listener(
41+
Box::new(move |event| {
42+
events_sender
43+
.send(event.as_borrowed_cow().into_owned_cow())
44+
.unwrap();
45+
}),
46+
None,
47+
)
48+
.await
49+
.unwrap();
50+
assert!(client.is_socket_connected());
51+
52+
await_event(
53+
&mut events_receiver,
54+
|event| matches!(event, SocketEvent::AuthSuccess),
55+
Duration::from_secs(20),
56+
)
57+
.await
58+
.unwrap();
59+
60+
std::mem::drop(handle);
61+
assert!(!client.is_socket_connected());
62+
await_event(
63+
&mut events_receiver,
64+
|event| matches!(event, SocketEvent::Unsubscribed),
65+
Duration::from_secs(20),
66+
)
67+
.await
68+
.unwrap();
69+
70+
// making sure it can reconnect
71+
72+
let (events_sender, mut events_receiver) = tokio::sync::mpsc::unbounded_channel();
73+
74+
assert!(!client.is_socket_connected());
75+
76+
let handle = client
77+
.add_event_listener(
78+
Box::new(move |event| {
79+
events_sender
80+
.send(event.as_borrowed_cow().into_owned_cow())
81+
.unwrap();
82+
}),
83+
None,
84+
)
85+
.await
86+
.unwrap();
87+
assert!(client.is_socket_connected());
88+
89+
await_event(
90+
&mut events_receiver,
91+
|event| matches!(event, SocketEvent::AuthSuccess),
92+
Duration::from_secs(20),
93+
)
94+
.await
95+
.unwrap();
96+
97+
std::mem::drop(handle);
98+
assert!(!client.is_socket_connected());
99+
await_event(
100+
&mut events_receiver,
101+
|event| matches!(event, SocketEvent::Unsubscribed),
102+
Duration::from_secs(20),
103+
)
104+
.await
105+
.unwrap();
106+
}

filen-sdk-rs/tests/socket_tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ async fn test_websocket_auth() {
7474
await_event(
7575
&mut events_receiver,
7676
|event| *event == SocketEvent::AuthSuccess,
77-
Duration::from_secs(10),
77+
Duration::from_secs(20),
7878
)
7979
.await;
8080
}
@@ -111,7 +111,7 @@ async fn test_websocket_event_filtering() {
111111
await_event(
112112
&mut events_receiver,
113113
|event| *event == SocketEvent::AuthSuccess,
114-
Duration::from_secs(10),
114+
Duration::from_secs(20),
115115
)
116116
.await;
117117

@@ -159,7 +159,7 @@ async fn test_websocket_file_folder_creation() {
159159
}
160160
_ => false,
161161
},
162-
Duration::from_secs(10),
162+
Duration::from_secs(20),
163163
)
164164
.await;
165165

@@ -169,7 +169,7 @@ async fn test_websocket_file_folder_creation() {
169169
SocketEvent::FileNew(data) => data.bucket == file_1.bucket && data.uuid == file_1.uuid,
170170
_ => false,
171171
},
172-
Duration::from_secs(10),
172+
Duration::from_secs(20),
173173
)
174174
.await;
175175
}

filen-types/src/api/v3/socket.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ pub enum SocketEvent<'a> {
105105
AuthFailed,
106106
/// Sent when the socket has unexpectedly closed and begins attempting to reconnect
107107
Reconnecting,
108+
/// Sent when the handle to the event listener has been dropped and the listener is removed
109+
Unsubscribed,
108110
#[serde(borrow)]
109111
NewEvent(NewEvent<'a>),
110112
#[serde(borrow)]
@@ -318,6 +320,7 @@ impl SocketEvent<'_> {
318320
SocketEvent::AuthSuccess => "authSuccess",
319321
SocketEvent::AuthFailed => "authFailed",
320322
SocketEvent::Reconnecting => "reconnecting",
323+
SocketEvent::Unsubscribed => "unsubscribed",
321324
SocketEvent::NewEvent(_) => "newEvent",
322325
SocketEvent::FileRename(_) => "fileRename",
323326
SocketEvent::FileArchiveRestored(_) => "fileArchiveRestored",

0 commit comments

Comments
 (0)