Skip to content

Commit b550b6a

Browse files
committed
feat: Reduce spawning tokio threads for socket mode
1 parent 818dd7b commit b550b6a

1 file changed

Lines changed: 61 additions & 70 deletions

File tree

src/hyper_tokio/socket_mode/tungstenite_wss_client.rs

Lines changed: 61 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ where
3737
#[derive(Clone, Debug)]
3838
enum SlackTungsteniteWssClientCommand {
3939
Message(String),
40-
Ping,
4140
Pong(Vec<u8>),
4241
Exit,
4342
}
@@ -225,36 +224,55 @@ where
225224
let thread_destroyed = destroyed.clone();
226225

227226
tokio::spawn(async move {
228-
while let Some(message) = rx.recv().await {
227+
let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(
228+
thread_identity.config.ping_interval_in_seconds,
229+
));
230+
ping_interval.tick().await;
231+
232+
loop {
229233
if thread_destroyed.load(Ordering::Relaxed) {
230234
break;
231235
}
232-
match message {
233-
SlackTungsteniteWssClientCommand::Message(body) => {
234-
if writer
235-
.send(tokio_tungstenite::tungstenite::Message::Text(body.into()))
236-
.await
237-
.is_err()
238-
{
239-
rx.close()
240-
}
241-
}
242-
SlackTungsteniteWssClientCommand::Pong(body) => {
243-
trace!(
244-
slack_wss_client_id = thread_identity.id.to_string().as_str(),
245-
"[{}] Pong to Slack: {:?}",
246-
thread_identity.id.to_string(),
247-
body
248-
);
249-
if writer
250-
.send(tokio_tungstenite::tungstenite::Message::Pong(body.into()))
251-
.await
252-
.is_err()
253-
{
254-
rx.close()
236+
tokio::select! {
237+
Some(message) = rx.recv() => {
238+
match message {
239+
SlackTungsteniteWssClientCommand::Message(body) => {
240+
if writer
241+
.send(tokio_tungstenite::tungstenite::Message::Text(body.into()))
242+
.await
243+
.is_err()
244+
{
245+
rx.close()
246+
}
247+
}
248+
SlackTungsteniteWssClientCommand::Pong(body) => {
249+
trace!(
250+
slack_wss_client_id = thread_identity.id.to_string().as_str(),
251+
"[{}] Pong to Slack: {:?}",
252+
thread_identity.id.to_string(),
253+
body
254+
);
255+
if writer
256+
.send(tokio_tungstenite::tungstenite::Message::Pong(body.into()))
257+
.await
258+
.is_err()
259+
{
260+
rx.close()
261+
}
262+
}
263+
SlackTungsteniteWssClientCommand::Exit => {
264+
writer.close().await.unwrap_or(());
265+
rx.close();
266+
trace!(
267+
slack_wss_client_id = thread_identity.id.to_string().as_str(),
268+
"[{}] WSS client command channel has been closed",
269+
thread_identity.id.to_string()
270+
);
271+
break;
272+
}
255273
}
256274
}
257-
SlackTungsteniteWssClientCommand::Ping => {
275+
_ = ping_interval.tick() => {
258276
let body: [u8; 5] = rand::random();
259277
trace!(
260278
slack_wss_client_id = thread_identity.id.to_string().as_str(),
@@ -282,7 +300,13 @@ where
282300
thread_identity.id.to_string(),
283301
seen_pong_time_in_secs
284302
);
285-
rx.close()
303+
rx.close();
304+
writer.close().await.unwrap_or(());
305+
thread_identity
306+
.client_listener
307+
.on_disconnect(&thread_identity.id)
308+
.await;
309+
break;
286310
} else if let Err(err) = writer
287311
.send(tokio_tungstenite::tungstenite::Message::Ping(
288312
body.to_vec().into(),
@@ -295,56 +319,23 @@ where
295319
thread_identity.id.to_string(),
296320
err
297321
);
298-
rx.close()
322+
rx.close();
323+
writer.close().await.unwrap_or(());
324+
thread_identity
325+
.client_listener
326+
.on_disconnect(&thread_identity.id)
327+
.await;
328+
break;
299329
}
300330
}
301-
SlackTungsteniteWssClientCommand::Exit => {
302-
writer.close().await.unwrap_or(());
303-
rx.close();
304-
trace!(
305-
slack_wss_client_id = thread_identity.id.to_string().as_str(),
306-
"[{}] WSS client command channel has been closed",
307-
thread_identity.id.to_string()
308-
);
331+
else => {
309332
break;
310333
}
311334
}
312335
}
313336
});
314337
}
315338

316-
{
317-
let thread_identity = identity.clone();
318-
let ping_tx = tx.clone();
319-
let thread_destroyed = destroyed.clone();
320-
tokio::spawn(async move {
321-
let mut interval = tokio::time::interval(std::time::Duration::from_secs(
322-
thread_identity.config.ping_interval_in_seconds,
323-
));
324-
325-
loop {
326-
if thread_destroyed.load(Ordering::Relaxed) {
327-
break;
328-
}
329-
interval.tick().await;
330-
if !ping_tx.is_closed() {
331-
if ping_tx
332-
.send(SlackTungsteniteWssClientCommand::Ping)
333-
.is_err()
334-
{
335-
break;
336-
}
337-
} else {
338-
thread_identity
339-
.client_listener
340-
.on_disconnect(&thread_identity.id)
341-
.await;
342-
break;
343-
}
344-
}
345-
});
346-
}
347-
348339
{
349340
let thread_identity = identity.clone();
350341
let thread_last_time_pong_received = last_time_pong_received;
@@ -383,7 +374,7 @@ where
383374
slack_wss_client_id = thread_identity.id.to_string().as_str(),
384375
"[{}] Ping from Slack: {:?}",
385376
thread_identity.id.to_string(),
386-
body
377+
&body
387378
);
388379
tx.send(SlackTungsteniteWssClientCommand::Pong(body.into()))
389380
.unwrap_or(());
@@ -393,7 +384,7 @@ where
393384
slack_wss_client_id = thread_identity.id.to_string().as_str(),
394385
"[{}] Pong from Slack: {:?}",
395386
thread_identity.id.to_string(),
396-
body
387+
&body
397388
);
398389
let mut last_pong = thread_last_time_pong_received.write().await;
399390
last_pong.time = SystemTime::now();

0 commit comments

Comments
 (0)