@@ -56,6 +56,12 @@ public class AtProtoJetstream : IDisposable
5656
5757 private ClientWebSocket _client ;
5858
59+ private bool _closed ;
60+
61+ private bool _closedGracefully ;
62+
63+ private CancellationToken _connectAsyncCancellationToken ;
64+
5965 /// <summary>
6066 /// Creates a new instance of <see cref="Jetstream"/>.
6167 /// </summary>
@@ -172,14 +178,20 @@ public IReadOnlyCollection<Nsid> CollectionFilter
172178 }
173179
174180 /// <summary>
175- /// Gets the underlying <see cref="ClientWebSocket"/> .
181+ /// Gets a flag indicating whether the underlying WebSocket is connected to the jetstream .
176182 /// </summary>
177- protected ClientWebSocket ClientWebSocket => _client ;
183+ public bool IsConnected => _client . State == WebSocketState . Open ;
178184
179185 /// <summary>
180186 /// Gets the <see cref="WebSocketState"/> of the underlying <see cref="ClientWebSocket"/>.
181187 /// </summary>
182- public WebSocketState ClientState => _client . State ;
188+ public WebSocketState State => _client . State ;
189+
190+ /// <summary>
191+ /// Gets a flag indicating whether the underlying WebSocket was disconnected gracefully,
192+ /// by requesting a cancellation on the <see cref="CancellationToken"/> passed passed to <see cref="ConnectAsync(Uri?, long?, CancellationToken)"/>.
193+ /// </summary>
194+ public bool DisconnectedGracefully => IsConnected && ( _closedGracefully || ! _connectAsyncCancellationToken . IsCancellationRequested ) ;
183195
184196 /// <summary>
185197 /// Gets the <see cref="DateTimeOffset"/> indicating when last time a message from the JetsStream was received.
@@ -212,18 +224,22 @@ public IReadOnlyCollection<Nsid> CollectionFilter
212224 /// <returns>A new <see cref="AtProtoJetstreamBuilder"/></returns>
213225 public static AtProtoJetstreamBuilder CreateBuilder ( ) => AtProtoJetstreamBuilder . Create ( ) ;
214226
227+ /// <summary>
228+ /// Gets the underlying <see cref="ClientWebSocket"/>.
229+ /// </summary>
230+ protected ClientWebSocket ClientWebSocket => _client ;
231+
215232 /// <summary>
216233 /// Called to raise any <see cref="MessageReceived"/> events, if any.
217234 /// </summary>
218235 /// <param name="e">The <see cref="MessageReceivedEventArgs"/> for the event.</param>
219236 protected virtual void OnMessageReceived ( MessageReceivedEventArgs e )
220237 {
221- MessageLastReceived = DateTimeOffset . UtcNow ;
222-
223238 EventHandler < MessageReceivedEventArgs > ? messageReceived = MessageReceived ;
224239
225240 if ( ! _disposed )
226241 {
242+ MessageLastReceived = DateTimeOffset . UtcNow ;
227243 _metrics . MessagesReceived ( 1 ) ;
228244 messageReceived ? . Invoke ( this , e ) ;
229245 }
@@ -251,7 +267,7 @@ protected virtual void OnRecordReceived(RecordReceivedEventArgs e)
251267 protected virtual void OnConnectionStateChanged ( ConnectionStateChangedEventArgs e )
252268 {
253269 EventHandler < ConnectionStateChangedEventArgs > ? connectionStatusChanged = ConnectionStateChanged ;
254-
270+
255271 if ( ! _disposed )
256272 {
257273 connectionStatusChanged ? . Invoke ( this , e ) ;
@@ -288,7 +304,7 @@ protected virtual void OnFaultRaised(FaultRaisedEventArgs e)
288304 /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
289305 [ MemberNotNull ( nameof ( _client ) ) ]
290306 public async Task ConnectAsync (
291- CancellationToken ? cancellationToken = default )
307+ CancellationToken cancellationToken = default )
292308 {
293309 await ConnectAsync (
294310 uri : null ,
@@ -307,13 +323,13 @@ await ConnectAsync(
307323 public async Task ConnectAsync (
308324 Uri ? uri = null ,
309325 DateTimeOffset ? startFrom = null ,
310- CancellationToken ? cancellationToken = default )
326+ CancellationToken cancellationToken = default )
311327 {
312328 long ? cursor = null ;
313329
314330 if ( startFrom is not null )
315331 {
316- cursor = DateTimeOffset . UtcNow . ToUnixTimeMilliseconds ( ) * 1000 ;
332+ cursor = startFrom . Value . ToUnixTimeMilliseconds ( ) * 1000 ;
317333 }
318334
319335 await ConnectAsync ( uri , cursor , cancellationToken ) . ConfigureAwait ( false ) ;
@@ -332,9 +348,9 @@ public async Task ConnectAsync(
332348 public async Task ConnectAsync (
333349 Uri ? uri = null ,
334350 long ? cursor = null ,
335- CancellationToken ? cancellationToken = default )
351+ CancellationToken cancellationToken = default )
336352 {
337- CancellationToken ctx = cancellationToken ?? CancellationToken . None ;
353+ _connectAsyncCancellationToken = cancellationToken ;
338354
339355 if ( _client is not null && _client . State == WebSocketState . Open )
340356 {
@@ -415,13 +431,29 @@ public async Task ConnectAsync(
415431
416432 Uri serverUri = new ( uriBuilder . ToString ( ) ) ;
417433
434+ WebSocketState previousState = _client . State ;
435+
418436 JetStreamLogger . ConnectingTo ( _logger , serverUri ) ;
419437
420- await _client . ConnectAsync ( serverUri , ctx ) . ConfigureAwait ( false ) ;
438+ try
439+ {
440+ await _client . ConnectAsync ( serverUri , cancellationToken ) . ConfigureAwait ( false ) ;
441+ _closed = false ;
442+ }
443+ catch ( WebSocketException ex )
444+ {
445+ JetStreamLogger . WebSocketException ( _logger , ex ) ;
446+ }
421447
422- OnConnectionStateChanged ( new ConnectionStateChangedEventArgs ( _client . State ) ) ;
448+ if ( _client . State != previousState )
449+ {
450+ OnConnectionStateChanged ( new ConnectionStateChangedEventArgs ( _client . State ) ) ;
451+ }
423452
424- ReceiveLoop ( ctx ) . FireAndForget ( ) ;
453+ if ( _client . State == WebSocketState . Open )
454+ {
455+ ReceiveLoop ( cancellationToken ) . FireAndForget ( ) ;
456+ }
425457 }
426458
427459 /// <summary>
@@ -435,15 +467,31 @@ public async Task ConnectAsync(
435467 public async Task CloseAsync (
436468 WebSocketCloseStatus status = WebSocketCloseStatus . NormalClosure ,
437469 string statusDescription = "Client disconnect" ,
438- CancellationToken ? cancellationToken = default )
470+ CancellationToken cancellationToken = default )
439471 {
440- CancellationToken ctx = cancellationToken ?? CancellationToken . None ;
472+ WebSocketState startingState = _client . State ;
441473
442474 if ( _client . State == WebSocketState . Open )
443475 {
444476 try
445477 {
446- await _client . CloseAsync ( status , statusDescription , ctx ) . ConfigureAwait ( false ) ;
478+ await _client . CloseAsync ( status , statusDescription , cancellationToken ) . ConfigureAwait ( false ) ;
479+ _closedGracefully = true ;
480+ }
481+ catch ( ObjectDisposedException )
482+ {
483+ throw ;
484+ }
485+ catch ( Exception ex )
486+ {
487+ JetStreamLogger . CloseError ( _logger , ex ) ;
488+ }
489+ }
490+ else if ( _client . State == WebSocketState . Connecting )
491+ {
492+ try
493+ {
494+ _client . Abort ( ) ;
447495 }
448496 catch ( ObjectDisposedException )
449497 {
@@ -455,14 +503,19 @@ public async Task CloseAsync(
455503 }
456504 }
457505
458- OnConnectionStateChanged ( new ConnectionStateChangedEventArgs ( _client . State ) ) ;
506+ _closed = true ;
507+
508+ if ( _client . State != startingState )
509+ {
510+ OnConnectionStateChanged ( new ConnectionStateChangedEventArgs ( _client . State ) ) ;
511+ }
459512 }
460513
461514 /// <summary>
462515 /// Log a fault.
463516 /// </summary>
464517 /// <param name="fault">A description of the fault.</param>
465- protected void LogFault ( string fault = "Unspecified fault" )
518+ protected void LogFault ( string fault = "Unspecified fault" )
466519 {
467520 OnFaultRaised ( new FaultRaisedEventArgs ( fault ) ) ;
468521 }
@@ -511,35 +564,54 @@ private ClientWebSocket CreateWebSocketClient()
511564 return client ;
512565 }
513566
567+ [ SuppressMessage ( "Design" , "CA1031:Do not catch general exception types" , Justification = "Catch all for logging." ) ]
514568 [ SuppressMessage ( "Reliability" , "CA2008:Do not create tasks without passing a TaskScheduler" , Justification = "A scheduler can be configured on the TaskFactory in Options." ) ]
515569 private async Task ReceiveLoop ( CancellationToken cancellationToken )
516570 {
517571 byte [ ] buffer = new byte [ Options . MaximumMessageSize ] ;
572+ WebSocketMessageType expectedMessageType = Options . UseCompression ? WebSocketMessageType . Binary : WebSocketMessageType . Text ;
518573
519- while ( ! cancellationToken . IsCancellationRequested )
520- {
521- WebSocketMessageType expectedMessageType = Options . UseCompression ? WebSocketMessageType . Binary : WebSocketMessageType . Text ;
574+ bool keepRunning = true ;
522575
576+ while ( keepRunning && ! _closed && ! cancellationToken . IsCancellationRequested )
577+ {
523578 try
524579 {
525- if ( _client . State != WebSocketState . Open )
526- {
527- await ConnectAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
528- }
529-
530580 ValueWebSocketReceiveResult result = await _client . ReceiveAsync ( new Memory < byte > ( buffer ) , cancellationToken ) . ConfigureAwait ( false ) ;
531581
532- if ( result . MessageType != expectedMessageType && ! result . EndOfMessage )
582+ if ( result . MessageType == WebSocketMessageType . Close )
533583 {
584+ await _client . CloseOutputAsync ( WebSocketCloseStatus . NormalClosure , null , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
585+ JetStreamLogger . CloseMessageReceived ( _logger ) ;
586+ keepRunning = false ;
534587 continue ;
535588 }
536589
590+ if ( result . MessageType != expectedMessageType )
591+ {
592+ JetStreamLogger . UnexpectedMessageType ( _logger , result . MessageType ) ;
593+
594+ if ( ! result . EndOfMessage )
595+ {
596+ continue ;
597+ }
598+ }
599+
537600 byte [ ] receivedData ;
538601
539602 if ( Options . UseCompression )
540603 {
541- Span < byte > bufferAsSpan = buffer . AsSpan ( 0 , result . Count ) ;
542- receivedData = _decompressor ! . Unwrap ( bufferAsSpan ) . ToArray ( ) ;
604+ try
605+ {
606+ Span < byte > bufferAsSpan = buffer . AsSpan ( 0 , result . Count ) ;
607+ receivedData = _decompressor ! . Unwrap ( bufferAsSpan ) . ToArray ( ) ;
608+ }
609+ catch ( ZstdException ex )
610+ {
611+ // Can't decompress so ignore this message.
612+ JetStreamLogger . DecompressionException ( _logger , ex ) ;
613+ continue ;
614+ }
543615 }
544616 else
545617 {
@@ -552,9 +624,7 @@ private async Task ReceiveLoop(CancellationToken cancellationToken)
552624
553625 if ( ! string . IsNullOrEmpty ( message ) )
554626 {
555-
556627 OnMessageReceived ( new MessageReceivedEventArgs ( message ) ) ;
557- _metrics . MessagesReceived ( 1 ) ;
558628
559629 // Now go to handle message in a new task.
560630#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
@@ -570,26 +640,10 @@ private async Task ReceiveLoop(CancellationToken cancellationToken)
570640 }
571641 }
572642 }
573- catch ( WebSocketException ex )
574- {
575- // Close the client and reopen.
576- LogFault ( ex . Message ) ;
577- _client . Dispose ( ) ;
578- await ConnectAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
579- }
580- catch ( OperationCanceledException )
643+ catch ( Exception e )
581644 {
582- JetStreamLogger . MessageLoopCancellation ( _logger ) ;
583- }
584- catch ( ObjectDisposedException )
585- {
586- throw ;
587- }
588- catch ( Exception ex )
589- {
590- LogFault ( ex . Message ) ;
591- JetStreamLogger . MessageLoopError ( _logger , ex ) ;
592- throw ;
645+ JetStreamLogger . MessageLoopError ( _logger , e ) ;
646+ LogFault ( e . Message ) ;
593647 }
594648 }
595649 }
@@ -672,7 +726,7 @@ private async Task SendOptionsUpdateMessage()
672726 payload . WantedDIDs = [ .. _dids ] ;
673727 }
674728
675- OptionsUpdateMessage optionsUpdateMessage = new ( )
729+ OptionsUpdateMessage optionsUpdateMessage = new ( )
676730 {
677731 Payload = payload
678732 } ;
0 commit comments