@@ -64,7 +64,10 @@ internal class JavaHttpWebSocket(
64
64
private val requestTime : GMTDate = GMTDate ()
65
65
) : WebSocket.Listener, WebSocketSession {
66
66
67
- private lateinit var webSocket: WebSocket
67
+ private var _webSocket : WebSocket ? = null
68
+ private val webSocket: WebSocket
69
+ get() = checkNotNull(_webSocket ) { " Web socket is not connected yet." }
70
+
68
71
private val socketJob = Job (callContext[Job ])
69
72
private val _incoming = Channel <Frame >(Channel .UNLIMITED )
70
73
private val _outgoing = Channel <Frame >(Channel .UNLIMITED )
@@ -90,50 +93,45 @@ internal class JavaHttpWebSocket(
90
93
get() = emptyList()
91
94
92
95
init {
93
- launch {
96
+ launch( CoroutineName ( " java-ws-outgoing " )) {
94
97
_outgoing .consumeEach { frame ->
95
- when (frame.frameType) {
96
- FrameType .TEXT -> {
97
- webSocket.sendText(String (frame.data), frame.fin).await()
98
- }
99
-
100
- FrameType .BINARY -> {
101
- webSocket.sendBinary(frame.buffer, frame.fin).await()
102
- }
103
-
104
- FrameType .CLOSE -> {
105
- val data = buildPacket { writeFully(frame.data) }
106
- val code = data.readShort().toInt()
107
- val reason = data.readText()
108
- webSocket.sendClose(code, reason).await()
109
- socketJob.complete()
110
- return @launch
111
- }
112
-
113
- FrameType .PING -> {
114
- webSocket.sendPing(frame.buffer).await()
115
- }
116
-
117
- FrameType .PONG -> {
118
- webSocket.sendPong(frame.buffer).await()
119
- }
98
+ webSocket.sendFrame(frame)
99
+ if (frame.frameType == FrameType .CLOSE ) {
100
+ socketJob.complete()
101
+ return @launch
120
102
}
121
103
}
122
104
}
123
105
124
- GlobalScope .launch(callContext, start = CoroutineStart .ATOMIC ) {
106
+ GlobalScope .launch(callContext + CoroutineName ( " java-ws-closer " ) , start = CoroutineStart .ATOMIC ) {
125
107
try {
126
108
socketJob[Job ]!! .join()
127
109
} catch (cause: Throwable ) {
128
110
val code = CloseReason .Codes .INTERNAL_ERROR .code.toInt()
129
- webSocket .sendClose(code, " Client failed" )
111
+ _webSocket ? .sendClose(code, " Client failed" )
130
112
} finally {
131
113
_incoming .close()
132
114
_outgoing .cancel()
133
115
}
134
116
}
135
117
}
136
118
119
+ private suspend fun WebSocket.sendFrame (frame : Frame ) {
120
+ when (frame.frameType) {
121
+ FrameType .TEXT -> sendText(String (frame.data), frame.fin).await()
122
+ FrameType .BINARY -> sendBinary(frame.buffer, frame.fin).await()
123
+ FrameType .PING -> sendPing(frame.buffer).await()
124
+ FrameType .PONG -> sendPong(frame.buffer).await()
125
+
126
+ FrameType .CLOSE -> {
127
+ val data = buildPacket { writeFully(frame.data) }
128
+ val code = data.readShort().toInt()
129
+ val reason = data.readText()
130
+ sendClose(code, reason).await()
131
+ }
132
+ }
133
+ }
134
+
137
135
@OptIn(InternalAPI ::class )
138
136
suspend fun getResponse (): HttpResponseData {
139
137
val builder = httpClient.newWebSocketBuilder()
@@ -163,7 +161,7 @@ internal class JavaHttpWebSocket(
163
161
var status = HttpStatusCode .SwitchingProtocols
164
162
var headers: Headers
165
163
try {
166
- webSocket = builder.buildAsync(requestData.url.toURI(), this ).await()
164
+ _webSocket = builder.buildAsync(requestData.url.toURI(), this ).await()
167
165
val protocol = webSocket.subprotocol?.takeIf { it.isNotEmpty() }
168
166
headers = if (protocol != null ) headersOf(HttpHeaders .SecWebSocketProtocol , protocol) else Headers .Empty
169
167
} catch (cause: WebSocketHandshakeException ) {
0 commit comments