@@ -19,30 +19,30 @@ package servlet
19
19
20
20
import cats .effect ._
21
21
import cats .effect .std .Dispatcher
22
- import cats .effect .std .Queue
23
22
import cats .syntax .all ._
24
23
import fs2 ._
25
24
import org .http4s .internal .bug
26
25
import org .log4s .getLogger
27
26
28
- import java .util .Arrays
29
27
import java .util .concurrent .atomic .AtomicReference
30
28
import javax .servlet .ReadListener
31
29
import javax .servlet .WriteListener
32
30
import javax .servlet .http .HttpServletRequest
33
31
import javax .servlet .http .HttpServletResponse
34
- import scala .annotation .nowarn
35
32
import scala .annotation .tailrec
36
33
37
34
/** Determines the mode of I/O used for reading request bodies and writing response bodies.
38
35
*/
39
36
sealed abstract class ServletIo [F [_]: Async ] {
40
37
protected [servlet] val F : Async [F ] = Async [F ]
41
38
42
- @ deprecated(" Prefer requestBody, which has access to a Dispatcher" , " 0.23.12" )
43
39
protected [servlet] def reader (servletRequest : HttpServletRequest ): EntityBody [F ]
44
40
45
- @ nowarn(" cat=deprecation" )
41
+ /** An alias for [[reader ]]. In the future, this will be optimized with
42
+ * the dispatcher.
43
+ *
44
+ * @param dispatcher currently ignored
45
+ */
46
46
def requestBody (
47
47
servletRequest : HttpServletRequest ,
48
48
dispatcher : Dispatcher [F ],
@@ -52,10 +52,13 @@ sealed abstract class ServletIo[F[_]: Async] {
52
52
}
53
53
54
54
/** May install a listener on the servlet response. */
55
- @ deprecated(" Prefer bodyWriter, which has access to a Dispatcher" , " 0.23.12" )
56
55
protected [servlet] def initWriter (servletResponse : HttpServletResponse ): BodyWriter [F ]
57
56
58
- @ nowarn(" cat=deprecation" )
57
+ /** An alias for [[initWriter ]]. In the future, this will be
58
+ * optimized with the dispatcher.
59
+ *
60
+ * @param dispatcher currently ignored
61
+ */
59
62
def bodyWriter (servletResponse : HttpServletResponse , dispatcher : Dispatcher [F ])(
60
63
response : Response [F ]
61
64
): F [Unit ] = {
@@ -206,73 +209,6 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl
206
209
}
207
210
}
208
211
209
- /* The queue implementation is influenced by ideas in jetty4s
210
- * https://github.com/IndiscriminateCoding/jetty4s/blob/0.0.10/server/src/main/scala/jetty4s/server/HttpResourceHandler.scala
211
- */
212
- override def requestBody (
213
- servletRequest : HttpServletRequest ,
214
- dispatcher : Dispatcher [F ],
215
- ): Stream [F , Byte ] = {
216
- sealed trait Read
217
- final case class Bytes (chunk : Chunk [Byte ]) extends Read
218
- case object End extends Read
219
- final case class Error (t : Throwable ) extends Read
220
-
221
- Stream .eval(F .delay(servletRequest.getInputStream)).flatMap { in =>
222
- Stream .eval(Queue .bounded[F , Read ](4 )).flatMap { q =>
223
- val readBody = Stream .exec(F .delay(in.setReadListener(new ReadListener {
224
- var buf : Array [Byte ] = _
225
- unsafeReplaceBuffer()
226
-
227
- def unsafeReplaceBuffer () =
228
- buf = new Array [Byte ](chunkSize)
229
-
230
- def onDataAvailable (): Unit = {
231
- def loopIfReady =
232
- F .delay(in.isReady()).flatMap {
233
- case true => go
234
- case false => F .unit
235
- }
236
-
237
- def go : F [Unit ] =
238
- F .delay(in.read(buf)).flatMap {
239
- case len if len == chunkSize =>
240
- // We used the whole buffer. Replace it new before next read.
241
- q.offer(Bytes (Chunk .array(buf))) >> F .delay(unsafeReplaceBuffer()) >> loopIfReady
242
- case len if len >= 0 =>
243
- // Got a partial chunk. Copy it, and reuse the current buffer.
244
- q.offer(Bytes (Chunk .array(Arrays .copyOf(buf, len)))) >> loopIfReady
245
- case _ =>
246
- F .unit
247
- }
248
-
249
- unsafeRunAndForget(go)
250
- }
251
-
252
- def onAllDataRead (): Unit =
253
- unsafeRunAndForget(q.offer(End ))
254
-
255
- def onError (t : Throwable ): Unit =
256
- unsafeRunAndForget(q.offer(Error (t)))
257
-
258
- def unsafeRunAndForget [A ](fa : F [A ]): Unit =
259
- dispatcher.unsafeRunAndForget(
260
- fa.onError { case t => F .delay(logger.error(t)(" Error in servlet read listener" )) }
261
- )
262
- })))
263
-
264
- def pullBody : Pull [F , Byte , Unit ] =
265
- Pull .eval(q.take).flatMap {
266
- case Bytes (chunk) => Pull .output(chunk) >> pullBody
267
- case End => Pull .done
268
- case Error (t) => Pull .raiseError[F ](t)
269
- }
270
-
271
- pullBody.stream.concurrently(readBody)
272
- }
273
- }
274
- }
275
-
276
212
override protected [servlet] def initWriter (
277
213
servletResponse : HttpServletResponse
278
214
): BodyWriter [F ] = {
@@ -367,74 +303,4 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl
367
303
.drain
368
304
}
369
305
}
370
-
371
- /* The queue implementation is influenced by ideas in jetty4s
372
- * https://github.com/IndiscriminateCoding/jetty4s/blob/0.0.10/server/src/main/scala/jetty4s/server/HttpResourceHandler.scala
373
- */
374
- override def bodyWriter (
375
- servletResponse : HttpServletResponse ,
376
- dispatcher : Dispatcher [F ],
377
- )(response : Response [F ]): F [Unit ] = {
378
- sealed trait Write
379
- final case class Bytes (chunk : Chunk [Byte ]) extends Write
380
- case object End extends Write
381
- case object Init extends Write
382
-
383
- val autoFlush = response.isChunked
384
-
385
- F .delay(servletResponse.getOutputStream).flatMap { out =>
386
- Queue .bounded[F , Write ](4 ).flatMap { q =>
387
- Deferred [F , Either [Throwable , Unit ]].flatMap { done =>
388
- val writeBody = F .delay(out.setWriteListener(new WriteListener {
389
- def onWritePossible (): Unit = {
390
- def loopIfReady = F .delay(out.isReady()).flatMap {
391
- case true => go
392
- case false => F .unit
393
- }
394
-
395
- def flush =
396
- if (autoFlush) {
397
- F .delay(out.isReady()).flatMap {
398
- case true => F .delay(out.flush()) >> loopIfReady
399
- case false => F .unit
400
- }
401
- } else
402
- loopIfReady
403
-
404
- def go : F [Unit ] =
405
- q.take.flatMap {
406
- case Bytes (slice : Chunk .ArraySlice [_]) =>
407
- F .delay(
408
- out.write(slice.values.asInstanceOf [Array [Byte ]], slice.offset, slice.length)
409
- ) >> flush
410
- case Bytes (chunk) =>
411
- F .delay(out.write(chunk.toArray)) >> flush
412
- case End =>
413
- F .delay(out.flush()) >> done.complete(Either .unit).attempt.void
414
- case Init =>
415
- if (autoFlush) flush else go
416
- }
417
-
418
- unsafeRunAndForget(go)
419
- }
420
- def onError (t : Throwable ): Unit =
421
- unsafeRunAndForget(done.complete(Left (t)))
422
-
423
- def unsafeRunAndForget [A ](fa : F [A ]): Unit =
424
- dispatcher.unsafeRunAndForget(
425
- fa.onError { case t => F .delay(logger.error(t)(" Error in servlet write listener" )) }
426
- )
427
- }))
428
-
429
- val writes = Stream .emit(Init ) ++ response.body.chunks.map(Bytes (_)) ++ Stream .emit(End )
430
-
431
- Stream
432
- .eval(writeBody >> done.get.rethrow)
433
- .mergeHaltL(writes.foreach(q.offer))
434
- .compile
435
- .drain
436
- }
437
- }
438
- }
439
- }
440
306
}
0 commit comments