@@ -2,8 +2,9 @@ use itoa::Buffer;
22#[ cfg( feature = "simd-json" ) ]
33use simd_json:: Mutable ;
44use tokio:: { sync:: broadcast, time:: Instant } ;
5+ use futures_util:: StreamExt ;
56use tracing:: { info, trace} ;
6- use twilight_gateway:: { parse, ConnectionStatus , Event , EventType , EventTypeFlags , Message , Shard } ;
7+ use twilight_gateway:: { parse, Event , EventType , EventTypeFlags , Message , Shard } ;
78use twilight_model:: gateway:: event:: GatewayEvent as TwilightGatewayEvent ;
89
910use std:: {
@@ -25,13 +26,13 @@ pub type BroadcastMessage = (String, Option<SequenceInfo>);
2526const TEN_SECONDS : Duration = Duration :: from_secs ( 10 ) ;
2627
2728pub async fn events (
28- mut shard : Shard ,
29+ mut shard : Shard < twilight_gateway_queue :: InMemoryQueue > ,
2930 shard_state : Arc < ShardState > ,
3031 shard_id : u32 ,
3132 shard_count : u32 ,
3233 broadcast_tx : broadcast:: Sender < BroadcastMessage > ,
3334 client : Arc < twilight_http:: Client > ,
34- ) {
35+ ) {
3536 // This method only wants to relay events while the shard is in a READY state
3637 // Therefore, we only put events in the queue while we are connected and READY
3738 let mut is_ready = false ;
@@ -49,24 +50,20 @@ pub async fn events(
4950
5051 if now. duration_since ( last_metrics_update) > TEN_SECONDS {
5152 let latencies = shard. latency ( ) . recent ( ) ;
52- let info = shard. status ( ) ;
53- update_shard_statistics ( & shard_id_str, & shard_state, info, latencies) ;
53+ let info = shard. state ( ) ;
54+ update_shard_statistics ( & shard_id_str, & shard_state, & info, latencies) ;
5455 last_metrics_update = now;
5556 }
5657
57- let payload = match shard. next_message ( ) . await {
58- Ok ( Message :: Text ( payload) ) => payload,
59- Ok ( Message :: Close ( _) ) if SHUTDOWN . load ( Ordering :: Relaxed ) => return ,
60- Ok ( Message :: Close ( _) ) => continue ,
61- Err ( e) => {
58+ let payload = match shard. next ( ) . await {
59+ Some ( Ok ( Message :: Text ( payload) ) ) => payload,
60+ Some ( Ok ( Message :: Close ( _) ) ) if SHUTDOWN . load ( Ordering :: Relaxed ) => return ,
61+ Some ( Ok ( Message :: Close ( _) ) ) => continue ,
62+ Some ( Err ( e) ) => {
6263 tracing:: error!( "Error receiving message: {e}" ) ;
63-
64- if e. is_fatal ( ) {
65- return ;
66- }
67-
6864 continue ;
6965 }
66+ None => return ,
7067 } ;
7168
7269 // NOTE: payload cannot be modified because we have to do optional event parsing
@@ -166,16 +163,25 @@ pub async fn events(
166163pub fn update_shard_statistics (
167164 shard_id : & str ,
168165 shard_state : & Arc < ShardState > ,
169- connection_status : & ConnectionStatus ,
166+ connection_status : & impl std :: fmt :: Debug ,
170167 latencies : & [ Duration ] ,
171168) {
172- // There is no way around this, sadly
173- let connection_status = match connection_status {
174- ConnectionStatus :: Connected => 4.0 ,
175- ConnectionStatus :: Disconnected { .. } => 1.0 ,
176- ConnectionStatus :: Identifying => 2.0 ,
177- ConnectionStatus :: Resuming => 3.0 ,
178- ConnectionStatus :: FatallyClosed { .. } => 0.0 ,
169+ // We don't want to depend on a specific twilight type name here. Use the
170+ // Debug representation to determine a rough numeric status mapping.
171+ let status_str = format ! ( "{:?}" , connection_status) ;
172+ let connection_status = if status_str. contains ( "Connected" ) {
173+ 4.0
174+ } else if status_str. contains ( "Identifying" ) {
175+ 2.0
176+ } else if status_str. contains ( "Resuming" ) {
177+ 3.0
178+ } else if status_str. contains ( "Disconnected" ) {
179+ 1.0
180+ } else if status_str. contains ( "FatallyClosed" ) {
181+ 0.0
182+ } else {
183+ // Unknown state
184+ f64:: NAN
179185 } ;
180186
181187 let latency = latencies. first ( ) . map_or ( f64:: NAN , Duration :: as_secs_f64) ;
0 commit comments