Skip to content

Commit 7f5fedf

Browse files
authored
Merge pull request #2663 from armanbilge/bug/read-readable
Re-worked Node.js `readReadable` API
2 parents 6149851 + c31db61 commit 7f5fedf

8 files changed

+128
-113
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform)
186186
"org.typelevel" %%% "cats-effect-laws" % "3.2.9" % Test,
187187
"org.typelevel" %%% "cats-effect-testkit" % "3.2.9" % Test,
188188
"org.scodec" %%% "scodec-bits" % "1.1.29",
189-
"org.typelevel" %%% "scalacheck-effect-munit" % "1.0.2" % Test,
189+
"org.typelevel" %%% "scalacheck-effect-munit" % "1.0.3" % Test,
190190
"org.typelevel" %%% "munit-cats-effect-3" % "1.0.6" % Test,
191191
"org.typelevel" %%% "discipline-munit" % "1.0.9" % Test
192192
),

io/js/src/main/scala/fs2/io/compressionplatform.scala

+23-14
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,20 @@ private[io] trait compressionplatform {
4242
.setLevel(deflateParams.level.juzDeflaterLevel.toDouble)
4343
.setStrategy(deflateParams.strategy.juzDeflaterStrategy.toDouble)
4444
.setFlush(deflateParams.flushMode.juzDeflaterFlushMode.toDouble)
45+
4546
Stream
46-
.eval(F.delay((deflateParams.header match {
47-
case ZLibParams.Header.GZIP => zlibMod.createGzip(options)
48-
case ZLibParams.Header.ZLIB => zlibMod.createDeflate(options)
49-
}).asInstanceOf[Duplex]))
50-
.flatMap { deflate =>
51-
readReadable[F](deflate.pure.widen)
47+
.resource(suspendReadableAndRead() {
48+
(deflateParams.header match {
49+
case ZLibParams.Header.GZIP => zlibMod.createGzip(options)
50+
case ZLibParams.Header.ZLIB => zlibMod.createDeflate(options)
51+
}).asInstanceOf[Duplex]
52+
})
53+
.flatMap { case (deflate, out) =>
54+
out
5255
.concurrently(in.through(writeWritable[F](deflate.pure.widen)))
56+
.onFinalize(
57+
F.async_[Unit](cb => deflate.asInstanceOf[zlibMod.Zlib].close(() => cb(Right(()))))
58+
)
5359
}
5460
}
5561

@@ -58,17 +64,20 @@ private[io] trait compressionplatform {
5864
.ZlibOptions()
5965
.setChunkSize(inflateParams.bufferSizeOrMinimum.toDouble)
6066
Stream
61-
.eval(F.delay((inflateParams.header match {
62-
case ZLibParams.Header.GZIP => zlibMod.createGunzip(options)
63-
case ZLibParams.Header.ZLIB => zlibMod.createInflate(options)
64-
}).asInstanceOf[Duplex]))
65-
.flatMap { inflate =>
66-
readReadable[F](inflate.pure.widen)
67+
.resource(suspendReadableAndRead() {
68+
(inflateParams.header match {
69+
case ZLibParams.Header.GZIP => zlibMod.createGunzip(options)
70+
case ZLibParams.Header.ZLIB => zlibMod.createInflate(options)
71+
}).asInstanceOf[Duplex]
72+
})
73+
.flatMap { case (inflate, out) =>
74+
out
6775
.concurrently(in.through(writeWritable[F](inflate.pure.widen)))
68-
76+
.onFinalize(
77+
F.async_[Unit](cb => inflate.asInstanceOf[zlibMod.Zlib].close(() => cb(Right(()))))
78+
)
6979
}
7080
}
7181

7282
}
73-
7483
}

io/js/src/main/scala/fs2/io/file/FilesPlatform.scala

+6-19
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,9 @@ private[fs2] trait FilesCompanionPlatform {
301301
private def readStream(path: Path, chunkSize: Int, flags: Flags)(
302302
f: fsMod.ReadStreamOptions => fsMod.ReadStreamOptions
303303
): Stream[F, Byte] =
304-
readReadable(
305-
F.async_[Readable] { cb =>
306-
val rs = fsMod
304+
Stream
305+
.resource(suspendReadableAndRead() {
306+
fsMod
307307
.createReadStream(
308308
path.toString,
309309
f(
@@ -313,22 +313,9 @@ private[fs2] trait FilesCompanionPlatform {
313313
.setHighWaterMark(chunkSize.toDouble)
314314
)
315315
)
316-
rs.once_ready(
317-
nodeStrings.ready,
318-
() => {
319-
rs.asInstanceOf[eventsMod.EventEmitter].removeAllListeners()
320-
cb(Right(rs.asInstanceOf[Readable]))
321-
}
322-
)
323-
rs.once_error(
324-
nodeStrings.error,
325-
error => {
326-
rs.asInstanceOf[eventsMod.EventEmitter].removeAllListeners()
327-
cb(Left(js.JavaScriptException(error)))
328-
}
329-
)
330-
}
331-
)
316+
.asInstanceOf[Readable]
317+
})
318+
.flatMap(_._2)
332319

333320
override def readAll(path: Path, chunkSize: Int, flags: Flags): Stream[F, Byte] =
334321
readStream(path, chunkSize, flags)(identity)

io/js/src/main/scala/fs2/io/ioplatform.scala

+64-26
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package fs2
2323
package io
2424

25+
import cats.effect.SyncIO
2526
import cats.effect.kernel.Async
2627
import cats.effect.kernel.Resource
2728
import cats.effect.std.Dispatcher
@@ -41,58 +42,83 @@ import scala.scalajs.js.|
4142

4243
private[fs2] trait ioplatform {
4344

45+
@deprecated("Use suspendReadableAndRead instead", "3.1.4")
4446
def readReadable[F[_]](
4547
readable: F[Readable],
4648
destroyIfNotEnded: Boolean = true,
4749
destroyIfCanceled: Boolean = true
4850
)(implicit
4951
F: Async[F]
50-
): Stream[F, Byte] =
51-
Stream
52-
.resource(for {
53-
readable <- Resource.makeCase(readable.map(_.asInstanceOf[streamMod.Readable])) {
52+
): Stream[F, Byte] = Stream
53+
.eval(readable)
54+
.flatMap(r => Stream.resource(suspendReadableAndRead(destroyIfNotEnded, destroyIfCanceled)(r)))
55+
.flatMap(_._2)
56+
57+
/** Suspends the creation of a `Readable` and a `Stream` that reads all bytes from that `Readable`.
58+
*/
59+
def suspendReadableAndRead[F[_], R <: Readable](
60+
destroyIfNotEnded: Boolean = true,
61+
destroyIfCanceled: Boolean = true
62+
)(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] =
63+
(for {
64+
dispatcher <- Dispatcher[F]
65+
queue <- Queue.synchronous[F, Option[Unit]].toResource
66+
error <- F.deferred[Throwable].toResource
67+
// Implementation Note: why suspend in `SyncIO` and then `unsafeRunSync()` inside `F.delay`?
68+
// In many cases creating a `Readable` starts async side-effects (e.g. negotiating TLS handshake or opening a file handle).
69+
// Furthermore, these side-effects will invoke the listeners we register to the `Readable`.
70+
// Therefore, it is critical that the listeners are registered to the `Readable` _before_ these async side-effects occur:
71+
// in other words, before we next yield (cede) to the event loop. Because an arbitrary effect `F` (particularly `IO`) may cede at any time,
72+
// our only recourse is to suspend the entire creation/listener registration process within a single atomic `delay`.
73+
readableResource = for {
74+
readable <- Resource.makeCase(SyncIO(thunk).map(_.asInstanceOf[streamMod.Readable])) {
5475
case (readable, Resource.ExitCase.Succeeded) =>
55-
F.delay {
76+
SyncIO {
5677
if (!readable.readableEnded & destroyIfNotEnded)
5778
readable.destroy()
5879
}
5980
case (readable, Resource.ExitCase.Errored(ex)) =>
60-
F.delay(readable.destroy(js.Error(ex.getMessage())))
81+
SyncIO(readable.destroy(js.Error(ex.getMessage())))
6182
case (readable, Resource.ExitCase.Canceled) =>
6283
if (destroyIfCanceled)
63-
F.delay(readable.destroy())
84+
SyncIO(readable.destroy())
6485
else
65-
F.unit
86+
SyncIO.unit
6687
}
67-
dispatcher <- Dispatcher[F]
68-
queue <- Queue.synchronous[F, Option[Unit]].toResource
69-
error <- F.deferred[Throwable].toResource
7088
_ <- registerListener0(readable, nodeStrings.readable)(_.on_readable(_, _)) { () =>
7189
dispatcher.unsafeRunAndForget(queue.offer(Some(())))
72-
}
90+
}(SyncIO.syncForSyncIO)
7391
_ <- registerListener0(readable, nodeStrings.end)(_.on_end(_, _)) { () =>
7492
dispatcher.unsafeRunAndForget(queue.offer(None))
75-
}
93+
}(SyncIO.syncForSyncIO)
7694
_ <- registerListener0(readable, nodeStrings.close)(_.on_close(_, _)) { () =>
7795
dispatcher.unsafeRunAndForget(queue.offer(None))
78-
}
96+
}(SyncIO.syncForSyncIO)
7997
_ <- registerListener[js.Error](readable, nodeStrings.error)(_.on_error(_, _)) { e =>
8098
dispatcher.unsafeRunAndForget(error.complete(js.JavaScriptException(e)))
81-
}
82-
} yield (readable, queue, error))
83-
.flatMap { case (readable, queue, error) =>
84-
Stream
99+
}(SyncIO.syncForSyncIO)
100+
} yield readable
101+
readable <- Resource
102+
.make(F.delay {
103+
readableResource.allocated.unsafeRunSync()
104+
}) { case (_, close) => close.to[F] }
105+
.map(_._1)
106+
stream =
107+
(Stream
85108
.fromQueueNoneTerminated(queue)
86109
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >>
87-
Stream.evalUnChunk(
88-
F.delay(
89-
Option(readable.read().asInstanceOf[bufferMod.global.Buffer])
90-
.fold(Chunk.empty[Byte])(_.toChunk)
91-
)
92-
)
93-
}
94-
.adaptError { case IOException(ex) => ex }
110+
Stream
111+
.evalUnChunk(
112+
F.delay(
113+
Option(readable.read().asInstanceOf[bufferMod.global.Buffer])
114+
.fold(Chunk.empty[Byte])(_.toChunk)
115+
)
116+
)).adaptError { case IOException(ex) => ex }
117+
} yield (readable.asInstanceOf[R], stream)).adaptError { case IOException(ex) => ex }
95118

119+
/** `Pipe` that converts a stream of bytes to a stream that will emit a single `Readable`,
120+
* that ends whenever the resulting stream terminates.
121+
*/
96122
def toReadable[F[_]](implicit F: Async[F]): Pipe[F, Byte, Readable] =
97123
in =>
98124
Stream
@@ -111,9 +137,13 @@ private[fs2] trait ioplatform {
111137
}
112138
.adaptError { case IOException(ex) => ex }
113139

140+
/** Like [[toReadable]] but returns a `Resource` rather than a single element stream.
141+
*/
114142
def toReadableResource[F[_]: Async](s: Stream[F, Byte]): Resource[F, Readable] =
115143
s.through(toReadable).compile.resource.lastOrError
116144

145+
/** Writes all bytes to the specified `Writable`.
146+
*/
117147
def writeWritable[F[_]](
118148
writable: F[Writable],
119149
endAfterUse: Boolean = true
@@ -147,9 +177,17 @@ private[fs2] trait ioplatform {
147177
}
148178
.adaptError { case IOException(ex) => ex }
149179

180+
/** Take a function that emits to a `Writable` effectfully,
181+
* and return a stream which, when run, will perform that function and emit
182+
* the bytes recorded in the `Writable` as an fs2.Stream
183+
*/
150184
def readWritable[F[_]: Async](f: Writable => F[Unit]): Stream[F, Byte] =
151185
Stream.empty.through(toDuplexAndRead(f))
152186

187+
/** Take a function that reads and writes from a `Duplex` effectfully,
188+
* and return a pipe which, when run, will perform that function,
189+
* write emitted bytes to the duplex, and read emitted bytes from the duplex
190+
*/
153191
def toDuplexAndRead[F[_]: Async](f: Duplex => F[Unit]): Pipe[F, Byte, Byte] =
154192
in =>
155193
Stream.resource(mkDuplex(in)).flatMap { case (duplex, out) =>

io/js/src/main/scala/fs2/io/net/SocketPlatform.scala

+6-26
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import cats.syntax.all._
3131
import com.comcast.ip4s.IpAddress
3232
import com.comcast.ip4s.Port
3333
import com.comcast.ip4s.SocketAddress
34-
import fs2.internal.jsdeps.node.eventsMod
3534
import fs2.internal.jsdeps.node.netMod
3635
import fs2.internal.jsdeps.node.streamMod
3736
import fs2.io.internal.SuspendedStream
@@ -43,32 +42,13 @@ private[net] trait SocketCompanionPlatform {
4342
private[net] def forAsync[F[_]](
4443
sock: netMod.Socket
4544
)(implicit F: Async[F]): Resource[F, Socket[F]] =
46-
SuspendedStream(
47-
readReadable(
48-
F.delay(sock.asInstanceOf[Readable]),
49-
destroyIfNotEnded = false,
50-
destroyIfCanceled = false
51-
)
52-
).evalTap { _ =>
53-
// Block until an error listener is registered
54-
F.async_[Unit] { cb =>
55-
val emitter = sock.asInstanceOf[eventsMod.EventEmitter]
56-
if (emitter.listenerCount("error") > 0)
57-
cb(Right(()))
58-
else {
59-
def go(): Unit =
60-
emitter.once(
61-
"newListener",
62-
eventName =>
63-
if (eventName.asInstanceOf[String] == "error")
64-
cb(Right(()))
65-
else
66-
go()
67-
)
68-
go()
69-
}
45+
suspendReadableAndRead(
46+
destroyIfNotEnded = false,
47+
destroyIfCanceled = false
48+
)(sock.asInstanceOf[Readable])
49+
.flatMap { case (_, stream) =>
50+
SuspendedStream(stream).map(new AsyncSocket(sock, _))
7051
}
71-
}.map(new AsyncSocket(sock, _))
7252
.onFinalize {
7353
F.delay {
7454
if (!sock.destroyed)

io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala

+24-22
Original file line numberDiff line numberDiff line change
@@ -67,33 +67,35 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type =>
6767
errorDef.complete(IOException.unapply(ex).getOrElse(ex))
6868
)
6969
}: js.Function1[js.Error, Unit]
70-
tlsSock <- Resource.make {
71-
F.delay {
72-
val tlsSock = upgrade(duplex.asInstanceOf[streamMod.Duplex])
73-
tlsSock.on_session(nodeStrings.session, sessionListener)
74-
tlsSock.asInstanceOf[netMod.Socket].on_error(nodeStrings.error, errorListener)
75-
tlsSock
76-
}
77-
} { tlsSock =>
78-
F.delay {
79-
val eventEmitter = tlsSock.asInstanceOf[eventsMod.EventEmitter]
80-
eventEmitter.removeListener(
81-
"session",
82-
sessionListener.asInstanceOf[js.Function1[Any, Unit]]
83-
)
84-
eventEmitter.removeListener("error", errorListener.asInstanceOf[js.Function1[Any, Unit]])
85-
}
70+
tlsSockReadable <- suspendReadableAndRead(
71+
destroyIfNotEnded = false,
72+
destroyIfCanceled = false
73+
) {
74+
val tlsSock = upgrade(duplex.asInstanceOf[streamMod.Duplex])
75+
tlsSock.on_session(nodeStrings.session, sessionListener)
76+
tlsSock.asInstanceOf[netMod.Socket].on_error(nodeStrings.error, errorListener)
77+
tlsSock.asInstanceOf[Readable]
8678
}
79+
.flatMap { case tlsSockReadable @ (tlsSock, _) =>
80+
Resource.pure(tlsSockReadable).onFinalize {
81+
F.delay {
82+
val eventEmitter = tlsSock.asInstanceOf[eventsMod.EventEmitter]
83+
eventEmitter.removeListener(
84+
"session",
85+
sessionListener.asInstanceOf[js.Function1[Any, Unit]]
86+
)
87+
eventEmitter
88+
.removeListener("error", errorListener.asInstanceOf[js.Function1[Any, Unit]])
89+
}
90+
}
91+
}
92+
(tlsSock, readable) = tlsSockReadable
8793
readStream <- SuspendedStream(
88-
readReadable(
89-
F.delay(tlsSock.asInstanceOf[Readable]),
90-
destroyIfNotEnded = false,
91-
destroyIfCanceled = false
92-
).concurrently(Stream.eval(errorDef.get.flatMap(F.raiseError[Unit])))
94+
readable
9395
).race(errorDef.get.flatMap(F.raiseError[SuspendedStream[F, Byte]]).toResource)
9496
.map(_.merge)
9597
} yield new AsyncTLSSocket(
96-
tlsSock,
98+
tlsSock.asInstanceOf[tlsMod.TLSSocket],
9799
readStream,
98100
sessionRef.discrete.unNone.head
99101
.concurrently(Stream.eval(errorDef.get.flatMap(F.raiseError[Unit])))

io/js/src/test/scala/fs2/io/IoPlatformSuite.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class IoPlatformSuite extends Fs2Suite {
3333
bytes
3434
.through(toReadable[IO])
3535
.flatMap { readable =>
36-
readReadable(IO.pure(readable))
36+
Stream.resource(suspendReadableAndRead[IO, Readable]()(readable)).flatMap(_._2)
3737
}
3838
.compile
3939
.toVector
@@ -54,7 +54,9 @@ class IoPlatformSuite extends Fs2Suite {
5454
bytes1
5555
.through {
5656
toDuplexAndRead[IO] { duplex =>
57-
readReadable[IO](IO.pure(duplex))
57+
Stream
58+
.resource(suspendReadableAndRead[IO, Duplex]()(duplex))
59+
.flatMap(_._2)
5860
.merge(bytes2.covary[IO].through(writeWritable[IO](IO.pure(duplex))))
5961
.compile
6062
.toVector

io/js/src/test/scala/fs2/io/NodeJSCompressionSuite.scala

-3
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ import fs2.io.internal.ByteChunkOps._
2929

3030
class NodeJSCompressionSuite extends CompressionSuite {
3131

32-
override def scalaCheckTestParameters =
33-
super.scalaCheckTestParameters.withMaxSize(10000)
34-
3532
override def deflateStream(
3633
b: Array[Byte],
3734
level: Int,

0 commit comments

Comments
 (0)