1919import java .nio .ByteBuffer ;
2020import java .nio .charset .Charset ;
2121import java .nio .charset .StandardCharsets ;
22+ import java .util .Objects ;
2223import java .util .function .Function ;
2324import java .util .function .IntPredicate ;
2425
26+ import org .eclipse .jetty .util .BufferUtil ;
2527import org .eclipse .jetty .websocket .api .Callback ;
26- import org .eclipse .jetty .websocket .api .Frame ;
2728import org .eclipse .jetty .websocket .api .Session ;
28- import org .eclipse .jetty .websocket .api .annotations .OnWebSocketClose ;
29- import org .eclipse .jetty .websocket .api .annotations .OnWebSocketError ;
30- import org .eclipse .jetty .websocket .api .annotations .OnWebSocketFrame ;
31- import org .eclipse .jetty .websocket .api .annotations .OnWebSocketMessage ;
32- import org .eclipse .jetty .websocket .api .annotations .OnWebSocketOpen ;
33- import org .eclipse .jetty .websocket .api .annotations .WebSocket ;
34- import org .eclipse .jetty .websocket .core .OpCode ;
29+ import org .reactivestreams .Subscriber ;
30+ import org .reactivestreams .Subscription ;
3531
3632import org .springframework .core .io .buffer .CloseableDataBuffer ;
3733import org .springframework .core .io .buffer .DataBuffer ;
3834import org .springframework .core .io .buffer .DataBufferFactory ;
39- import org .springframework .lang .Nullable ;
4035import org .springframework .util .Assert ;
4136import org .springframework .web .reactive .socket .CloseStatus ;
4237import org .springframework .web .reactive .socket .WebSocketHandler ;
4338import org .springframework .web .reactive .socket .WebSocketMessage ;
4439import org .springframework .web .reactive .socket .WebSocketMessage .Type ;
4540
4641/**
47- * Jetty {@link WebSocket @WebSocket } handler that delegates events to a
42+ * Jetty {@link org.eclipse.jetty.websocket.api.Session.Listener } handler that delegates events to a
4843 * reactive {@link WebSocketHandler} and its session.
4944 *
5045 * @author Violeta Georgieva
5146 * @author Rossen Stoyanchev
5247 * @since 5.0
5348 */
54- @ WebSocket
55- public class JettyWebSocketHandlerAdapter {
56- private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer .wrap (new byte [0 ]);
57-
49+ public class JettyWebSocketHandlerAdapter implements Session .Listener {
5850 private final WebSocketHandler delegateHandler ;
5951
6052 private final Function <Session , JettyWebSocketSession > sessionFactory ;
6153
62- @ Nullable
54+ @ SuppressWarnings ( "NotNullFieldNotInitialized" )
6355 private JettyWebSocketSession delegateSession ;
6456
6557 public JettyWebSocketHandlerAdapter (WebSocketHandler handler ,
@@ -71,65 +63,63 @@ public JettyWebSocketHandlerAdapter(WebSocketHandler handler,
7163 this .sessionFactory = sessionFactory ;
7264 }
7365
74- @ OnWebSocketOpen
66+ @ Override
7567 public void onWebSocketOpen (Session session ) {
76- this .delegateSession = this .sessionFactory .apply (session );
68+ this .delegateSession = Objects . requireNonNull ( this .sessionFactory .apply (session ) );
7769 this .delegateHandler .handle (this .delegateSession )
78- .checkpoint (session .getUpgradeRequest ().getRequestURI () + " [JettyWebSocketHandlerAdapter]" )
79- .subscribe (this .delegateSession );
70+ .subscribe (new Subscriber <>() {
71+ @ Override
72+ public void onSubscribe (Subscription s ) {
73+ s .request (Long .MAX_VALUE );
74+ }
75+
76+ @ Override
77+ public void onNext (Void unused ) {
78+ }
79+
80+ @ Override
81+ public void onError (Throwable t ) {
82+ delegateSession .onHandlerError (t );
83+ }
84+
85+ @ Override
86+ public void onComplete () {
87+ delegateSession .onHandleComplete ();
88+ }
89+ });
8090 }
8191
82- @ OnWebSocketMessage
92+ @ Override
8393 public void onWebSocketText (String message ) {
84- if (this .delegateSession != null ) {
85- byte [] bytes = message .getBytes (StandardCharsets .UTF_8 );
86- DataBuffer buffer = this .delegateSession .bufferFactory ().wrap (bytes );
87- WebSocketMessage webSocketMessage = new WebSocketMessage (Type .TEXT , buffer );
88- this .delegateSession .handleMessage (webSocketMessage .getType (), webSocketMessage );
89- }
94+ byte [] bytes = message .getBytes (StandardCharsets .UTF_8 );
95+ DataBuffer buffer = this .delegateSession .bufferFactory ().wrap (bytes );
96+ WebSocketMessage webSocketMessage = new WebSocketMessage (Type .TEXT , buffer );
97+ this .delegateSession .handleMessage (webSocketMessage );
9098 }
9199
92- @ OnWebSocketMessage
100+ @ Override
93101 public void onWebSocketBinary (ByteBuffer byteBuffer , Callback callback ) {
94- if (this .delegateSession != null ) {
95- DataBuffer buffer = this .delegateSession .bufferFactory ().wrap (byteBuffer );
96- buffer = new JettyDataBuffer (buffer , callback );
97- WebSocketMessage webSocketMessage = new WebSocketMessage (Type .BINARY , buffer );
98- this .delegateSession .handleMessage (webSocketMessage .getType (), webSocketMessage );
99- }
100- else {
101- callback .succeed ();
102- }
102+ DataBuffer buffer = this .delegateSession .bufferFactory ().wrap (byteBuffer );
103+ buffer = new JettyDataBuffer (buffer , callback );
104+ WebSocketMessage webSocketMessage = new WebSocketMessage (Type .BINARY , buffer );
105+ this .delegateSession .handleMessage (webSocketMessage );
103106 }
104107
105- @ OnWebSocketFrame
106- public void onWebSocketFrame (Frame frame , Callback callback ) {
107- if (this .delegateSession != null ) {
108- if (OpCode .PONG == frame .getOpCode ()) {
109- ByteBuffer byteBuffer = (frame .getPayload () != null ? frame .getPayload () : EMPTY_PAYLOAD );
110- DataBuffer buffer = this .delegateSession .bufferFactory ().wrap (byteBuffer );
111- buffer = new JettyDataBuffer (buffer , callback );
112- WebSocketMessage webSocketMessage = new WebSocketMessage (Type .PONG , buffer );
113- this .delegateSession .handleMessage (webSocketMessage .getType (), webSocketMessage );
114- return ;
115- }
116- }
117-
118- callback .succeed ();
108+ @ Override
109+ public void onWebSocketPong (ByteBuffer payload ) {
110+ DataBuffer buffer = this .delegateSession .bufferFactory ().wrap (BufferUtil .copy (payload ));
111+ WebSocketMessage webSocketMessage = new WebSocketMessage (Type .PONG , buffer );
112+ this .delegateSession .handleMessage (webSocketMessage );
119113 }
120114
121- @ OnWebSocketClose
115+ @ Override
122116 public void onWebSocketClose (int statusCode , String reason ) {
123- if (this .delegateSession != null ) {
124- this .delegateSession .handleClose (CloseStatus .create (statusCode , reason ));
125- }
117+ this .delegateSession .handleClose (CloseStatus .create (statusCode , reason ));
126118 }
127119
128- @ OnWebSocketError
120+ @ Override
129121 public void onWebSocketError (Throwable cause ) {
130- if (this .delegateSession != null ) {
131- this .delegateSession .handleError (cause );
132- }
122+ this .delegateSession .handleError (cause );
133123 }
134124
135125
0 commit comments