Skip to content

Commit 5a93d6c

Browse files
authored
Always restart if the WsClient encounters an error (#3355)
Fixes #3347
1 parent f085918 commit 5a93d6c

File tree

1 file changed

+59
-66
lines changed

1 file changed

+59
-66
lines changed

src/gateway/sharding/shard_runner.rs

Lines changed: 59 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -266,60 +266,6 @@ impl ShardRunner {
266266
}
267267
}
268268

269-
// Handles a received value over the shard runner rx channel.
270-
//
271-
// Returns a boolean on whether the shard runner can continue.
272-
//
273-
// This always returns true, except in the case that the shard manager asked the runner to
274-
// shutdown or restart.
275-
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
276-
async fn handle_rx_value(&mut self, msg: ShardRunnerMessage) -> bool {
277-
match msg {
278-
ShardRunnerMessage::Restart => {
279-
self.restart().await;
280-
false
281-
},
282-
ShardRunnerMessage::Shutdown(code) => {
283-
self.shutdown(code).await;
284-
false
285-
},
286-
ShardRunnerMessage::ChunkGuild {
287-
guild_id,
288-
limit,
289-
presences,
290-
filter,
291-
nonce,
292-
} => self
293-
.shard
294-
.chunk_guild(guild_id, limit, presences, filter, nonce.as_deref())
295-
.await
296-
.is_ok(),
297-
ShardRunnerMessage::SetPresence {
298-
activity,
299-
status,
300-
} => {
301-
if let Some(activity) = activity {
302-
self.shard.set_activity(activity);
303-
}
304-
if let Some(status) = status {
305-
self.shard.set_status(status);
306-
}
307-
self.shard.update_presence().await.is_ok()
308-
},
309-
#[cfg(feature = "voice")]
310-
ShardRunnerMessage::UpdateVoiceState {
311-
guild_id,
312-
channel_id,
313-
self_mute,
314-
self_deaf,
315-
} => self
316-
.shard
317-
.update_voice_state(guild_id, channel_id, self_mute, self_deaf)
318-
.await
319-
.is_ok(),
320-
}
321-
}
322-
323269
#[cfg(feature = "voice")]
324270
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
325271
async fn handle_voice_event(&self, event: &Event) {
@@ -345,29 +291,76 @@ impl ShardRunner {
345291
}
346292
}
347293

348-
// Receives values over the internal shard runner rx channel and handles them. This will loop
349-
// over values until there is no longer one.
350-
//
351-
// Requests a restart if the sending half of the channel disconnects. This should _never_
352-
// happen, as the sending half is kept on the runner.
294+
// Receives messages over the internal `runner_rx` channel and handles them. Will loop over all
295+
// queued messages until the channel is empty. Requests a restart if handling a message fails.
353296
//
354-
// Returns whether the shard runner is in a state that can continue.
297+
// Returns whether the shard runner can continue executing its main loop.
355298
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
356299
async fn recv(&mut self) -> bool {
357300
while let Ok(msg) = self.runner_rx.try_next() {
358-
if let Some(value) = msg {
359-
if !self.handle_rx_value(value).await {
301+
let Some(msg) = msg else {
302+
// This should never happen, because `self.runner_tx` always holds a copy of the
303+
// other end of the channel.
304+
warn!(
305+
"[ShardRunner {:?}] Internal channel tx dropped; restarting",
306+
self.shard.shard_info(),
307+
);
308+
309+
self.restart().await;
310+
return false;
311+
};
312+
313+
let res = match msg {
314+
ShardRunnerMessage::Restart => {
315+
self.restart().await;
360316
return false;
361-
}
362-
} else {
363-
warn!("[ShardRunner {:?}] Sending half DC; restarting", self.shard.shard_info(),);
317+
},
318+
ShardRunnerMessage::Shutdown(code) => {
319+
self.shutdown(code).await;
320+
return false;
321+
},
322+
ShardRunnerMessage::ChunkGuild {
323+
guild_id,
324+
limit,
325+
presences,
326+
filter,
327+
nonce,
328+
} => {
329+
self.shard
330+
.chunk_guild(guild_id, limit, presences, filter, nonce.as_deref())
331+
.await
332+
},
333+
ShardRunnerMessage::SetPresence {
334+
activity,
335+
status,
336+
} => {
337+
if let Some(activity) = activity {
338+
self.shard.set_activity(activity);
339+
}
340+
if let Some(status) = status {
341+
self.shard.set_status(status);
342+
}
343+
self.shard.update_presence().await
344+
},
345+
#[cfg(feature = "voice")]
346+
ShardRunnerMessage::UpdateVoiceState {
347+
guild_id,
348+
channel_id,
349+
self_mute,
350+
self_deaf,
351+
} => {
352+
self.shard.update_voice_state(guild_id, channel_id, self_mute, self_deaf).await
353+
},
354+
};
355+
356+
if let Err(why) = res {
357+
warn!("[ShardRunner {:?}] Websocket error: {:?}", self.shard.shard_info(), why);
364358

365359
self.restart().await;
366360
return false;
367361
}
368362
}
369363

370-
// There are no longer any values available.
371364
true
372365
}
373366

0 commit comments

Comments
 (0)