Skip to content

Commit c31db61

Browse files
committed
Rework Node.js readReadable API, take 2
1 parent 1ebc47d commit c31db61

File tree

6 files changed

+128
-124
lines changed

6 files changed

+128
-124
lines changed

io/js/src/main/scala/fs2/io/compressionplatform.scala

+25-19
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,20 @@ private[io] trait compressionplatform {
4242
.setLevel(deflateParams.level.juzDeflaterLevel.toDouble)
4343
.setStrategy(deflateParams.strategy.juzDeflaterStrategy.toDouble)
4444
.setFlush(deflateParams.flushMode.juzDeflaterFlushMode.toDouble)
45+
4546
Stream
46-
.bracket(F.delay(deflateParams.header match {
47-
case ZLibParams.Header.GZIP => zlibMod.createGzip(options)
48-
case ZLibParams.Header.ZLIB => zlibMod.createDeflate(options)
49-
}))(z => F.async_(cb => z.close(() => cb(Right(())))))
50-
.flatMap { _deflate =>
51-
val deflate = _deflate.asInstanceOf[Duplex].pure
52-
Stream
53-
.resource(readReadableResource[F](deflate.widen))
54-
.flatMap(_.concurrently(in.through(writeWritable[F](deflate.widen))))
47+
.resource(suspendReadableAndRead() {
48+
(deflateParams.header match {
49+
case ZLibParams.Header.GZIP => zlibMod.createGzip(options)
50+
case ZLibParams.Header.ZLIB => zlibMod.createDeflate(options)
51+
}).asInstanceOf[Duplex]
52+
})
53+
.flatMap { case (deflate, out) =>
54+
out
55+
.concurrently(in.through(writeWritable[F](deflate.pure.widen)))
56+
.onFinalize(
57+
F.async_[Unit](cb => deflate.asInstanceOf[zlibMod.Zlib].close(() => cb(Right(()))))
58+
)
5559
}
5660
}
5761

@@ -60,18 +64,20 @@ private[io] trait compressionplatform {
6064
.ZlibOptions()
6165
.setChunkSize(inflateParams.bufferSizeOrMinimum.toDouble)
6266
Stream
63-
.bracket(F.delay(inflateParams.header match {
64-
case ZLibParams.Header.GZIP => zlibMod.createGunzip(options)
65-
case ZLibParams.Header.ZLIB => zlibMod.createInflate(options)
66-
}))(z => F.async_(cb => z.close(() => cb(Right(())))))
67-
.flatMap { _inflate =>
68-
val inflate = _inflate.asInstanceOf[Duplex].pure
69-
Stream
70-
.resource(readReadableResource[F](inflate.widen))
71-
.flatMap(_.concurrently(in.through(writeWritable[F](inflate.widen))))
67+
.resource(suspendReadableAndRead() {
68+
(inflateParams.header match {
69+
case ZLibParams.Header.GZIP => zlibMod.createGunzip(options)
70+
case ZLibParams.Header.ZLIB => zlibMod.createInflate(options)
71+
}).asInstanceOf[Duplex]
72+
})
73+
.flatMap { case (inflate, out) =>
74+
out
75+
.concurrently(in.through(writeWritable[F](inflate.pure.widen)))
76+
.onFinalize(
77+
F.async_[Unit](cb => inflate.asInstanceOf[zlibMod.Zlib].close(() => cb(Right(()))))
78+
)
7279
}
7380
}
7481

7582
}
76-
7783
}

io/js/src/main/scala/fs2/io/file/FilesPlatform.scala

+13-30
Original file line numberDiff line numberDiff line change
@@ -302,37 +302,20 @@ private[fs2] trait FilesCompanionPlatform {
302302
f: fsMod.ReadStreamOptions => fsMod.ReadStreamOptions
303303
): Stream[F, Byte] =
304304
Stream
305-
.resource(
306-
readReadableResource(
307-
F.async_[Readable] { cb =>
308-
val rs = fsMod
309-
.createReadStream(
310-
path.toString,
311-
f(
312-
js.Dynamic
313-
.literal(flags = combineFlags(flags))
314-
.asInstanceOf[fsMod.ReadStreamOptions]
315-
.setHighWaterMark(chunkSize.toDouble)
316-
)
317-
)
318-
rs.once_ready(
319-
nodeStrings.ready,
320-
() => {
321-
rs.asInstanceOf[eventsMod.EventEmitter].removeAllListeners()
322-
cb(Right(rs.asInstanceOf[Readable]))
323-
}
324-
)
325-
rs.once_error(
326-
nodeStrings.error,
327-
error => {
328-
rs.asInstanceOf[eventsMod.EventEmitter].removeAllListeners()
329-
cb(Left(js.JavaScriptException(error)))
330-
}
305+
.resource(suspendReadableAndRead() {
306+
fsMod
307+
.createReadStream(
308+
path.toString,
309+
f(
310+
js.Dynamic
311+
.literal(flags = combineFlags(flags))
312+
.asInstanceOf[fsMod.ReadStreamOptions]
313+
.setHighWaterMark(chunkSize.toDouble)
331314
)
332-
}
333-
)
334-
)
335-
.flatten
315+
)
316+
.asInstanceOf[Readable]
317+
})
318+
.flatMap(_._2)
336319

337320
override def readAll(path: Path, chunkSize: Int, flags: Flags): Stream[F, Byte] =
338321
readStream(path, chunkSize, flags)(identity)

io/js/src/main/scala/fs2/io/ioplatform.scala

+60-47
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package fs2
2323
package io
2424

25+
import cats.effect.SyncIO
2526
import cats.effect.kernel.Async
2627
import cats.effect.kernel.Resource
2728
import cats.effect.std.Dispatcher
@@ -41,67 +42,79 @@ import scala.scalajs.js.|
4142

4243
private[fs2] trait ioplatform {
4344

44-
@deprecated("Use readReadableResource for safer variant", "3.1.4")
45+
@deprecated("Use suspendReadableAndRead instead", "3.1.4")
4546
def readReadable[F[_]](
4647
readable: F[Readable],
4748
destroyIfNotEnded: Boolean = true,
4849
destroyIfCanceled: Boolean = true
4950
)(implicit
5051
F: Async[F]
51-
): Stream[F, Byte] =
52-
Stream.resource(readReadableResource(readable, destroyIfNotEnded, destroyIfCanceled)).flatten
52+
): Stream[F, Byte] = Stream
53+
.eval(readable)
54+
.flatMap(r => Stream.resource(suspendReadableAndRead(destroyIfNotEnded, destroyIfCanceled)(r)))
55+
.flatMap(_._2)
5356

54-
/** Reads all bytes from the specified `Readable`.
55-
* Note that until the resource is opened, it is not safe to use the `Readable`
56-
* without risking loss of data or events (e.g., termination, error).
57+
/** Suspends the creation of a `Readable` and a `Stream` that reads all bytes from that `Readable`.
5758
*/
58-
def readReadableResource[F[_]](
59-
readable: F[Readable],
59+
def suspendReadableAndRead[F[_], R <: Readable](
6060
destroyIfNotEnded: Boolean = true,
6161
destroyIfCanceled: Boolean = true
62-
)(implicit
63-
F: Async[F]
64-
): Resource[F, Stream[F, Byte]] =
62+
)(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] =
6563
(for {
66-
readable <- Resource.makeCase(readable.map(_.asInstanceOf[streamMod.Readable])) {
67-
case (readable, Resource.ExitCase.Succeeded) =>
68-
F.delay {
69-
if (!readable.readableEnded & destroyIfNotEnded)
70-
readable.destroy()
71-
}
72-
case (readable, Resource.ExitCase.Errored(ex)) =>
73-
F.delay(readable.destroy(js.Error(ex.getMessage())))
74-
case (readable, Resource.ExitCase.Canceled) =>
75-
if (destroyIfCanceled)
76-
F.delay(readable.destroy())
77-
else
78-
F.unit
79-
}
8064
dispatcher <- Dispatcher[F]
8165
queue <- Queue.synchronous[F, Option[Unit]].toResource
8266
error <- F.deferred[Throwable].toResource
83-
_ <- registerListener0(readable, nodeStrings.readable)(_.on_readable(_, _)) { () =>
84-
dispatcher.unsafeRunAndForget(queue.offer(Some(())))
85-
}
86-
_ <- registerListener0(readable, nodeStrings.end)(_.on_end(_, _)) { () =>
87-
dispatcher.unsafeRunAndForget(queue.offer(None))
88-
}
89-
_ <- registerListener0(readable, nodeStrings.close)(_.on_close(_, _)) { () =>
90-
dispatcher.unsafeRunAndForget(queue.offer(None))
91-
}
92-
_ <- registerListener[js.Error](readable, nodeStrings.error)(_.on_error(_, _)) { e =>
93-
dispatcher.unsafeRunAndForget(error.complete(js.JavaScriptException(e)))
94-
}
95-
} yield (Stream
96-
.fromQueueNoneTerminated(queue)
97-
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >>
98-
Stream
99-
.evalUnChunk(
100-
F.delay(
101-
Option(readable.read().asInstanceOf[bufferMod.global.Buffer])
102-
.fold(Chunk.empty[Byte])(_.toChunk)
103-
)
104-
)).adaptError { case IOException(ex) => ex }).adaptError { case IOException(ex) => ex }
67+
// Implementation Note: why suspend in `SyncIO` and then `unsafeRunSync()` inside `F.delay`?
68+
// In many cases creating a `Readable` starts async side-effects (e.g. negotiating TLS handshake or opening a file handle).
69+
// Furthermore, these side-effects will invoke the listeners we register to the `Readable`.
70+
// Therefore, it is critical that the listeners are registered to the `Readable` _before_ these async side-effects occur:
71+
// in other words, before we next yield (cede) to the event loop. Because an arbitrary effect `F` (particularly `IO`) may cede at any time,
72+
// our only recourse is to suspend the entire creation/listener registration process within a single atomic `delay`.
73+
readableResource = for {
74+
readable <- Resource.makeCase(SyncIO(thunk).map(_.asInstanceOf[streamMod.Readable])) {
75+
case (readable, Resource.ExitCase.Succeeded) =>
76+
SyncIO {
77+
if (!readable.readableEnded & destroyIfNotEnded)
78+
readable.destroy()
79+
}
80+
case (readable, Resource.ExitCase.Errored(ex)) =>
81+
SyncIO(readable.destroy(js.Error(ex.getMessage())))
82+
case (readable, Resource.ExitCase.Canceled) =>
83+
if (destroyIfCanceled)
84+
SyncIO(readable.destroy())
85+
else
86+
SyncIO.unit
87+
}
88+
_ <- registerListener0(readable, nodeStrings.readable)(_.on_readable(_, _)) { () =>
89+
dispatcher.unsafeRunAndForget(queue.offer(Some(())))
90+
}(SyncIO.syncForSyncIO)
91+
_ <- registerListener0(readable, nodeStrings.end)(_.on_end(_, _)) { () =>
92+
dispatcher.unsafeRunAndForget(queue.offer(None))
93+
}(SyncIO.syncForSyncIO)
94+
_ <- registerListener0(readable, nodeStrings.close)(_.on_close(_, _)) { () =>
95+
dispatcher.unsafeRunAndForget(queue.offer(None))
96+
}(SyncIO.syncForSyncIO)
97+
_ <- registerListener[js.Error](readable, nodeStrings.error)(_.on_error(_, _)) { e =>
98+
dispatcher.unsafeRunAndForget(error.complete(js.JavaScriptException(e)))
99+
}(SyncIO.syncForSyncIO)
100+
} yield readable
101+
readable <- Resource
102+
.make(F.delay {
103+
readableResource.allocated.unsafeRunSync()
104+
}) { case (_, close) => close.to[F] }
105+
.map(_._1)
106+
stream =
107+
(Stream
108+
.fromQueueNoneTerminated(queue)
109+
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >>
110+
Stream
111+
.evalUnChunk(
112+
F.delay(
113+
Option(readable.read().asInstanceOf[bufferMod.global.Buffer])
114+
.fold(Chunk.empty[Byte])(_.toChunk)
115+
)
116+
)).adaptError { case IOException(ex) => ex }
117+
} yield (readable.asInstanceOf[R], stream)).adaptError { case IOException(ex) => ex }
105118

106119
/** `Pipe` that converts a stream of bytes to a stream that will emit a single `Readable`,
107120
* that ends whenever the resulting stream terminates.

io/js/src/main/scala/fs2/io/net/SocketPlatform.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ private[net] trait SocketCompanionPlatform {
4242
private[net] def forAsync[F[_]](
4343
sock: netMod.Socket
4444
)(implicit F: Async[F]): Resource[F, Socket[F]] =
45-
readReadableResource(
46-
F.delay(sock.asInstanceOf[Readable]),
45+
suspendReadableAndRead(
4746
destroyIfNotEnded = false,
4847
destroyIfCanceled = false
49-
).flatMap(SuspendedStream(_))
50-
.map(new AsyncSocket(sock, _))
48+
)(sock.asInstanceOf[Readable])
49+
.flatMap { case (_, stream) =>
50+
SuspendedStream(stream).map(new AsyncSocket(sock, _))
51+
}
5152
.onFinalize {
5253
F.delay {
5354
if (!sock.destroyed)

io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala

+22-21
Original file line numberDiff line numberDiff line change
@@ -67,34 +67,35 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type =>
6767
errorDef.complete(IOException.unapply(ex).getOrElse(ex))
6868
)
6969
}: js.Function1[js.Error, Unit]
70-
tlsSock <- Resource.make {
71-
F.delay {
72-
val tlsSock = upgrade(duplex.asInstanceOf[streamMod.Duplex])
73-
tlsSock.on_session(nodeStrings.session, sessionListener)
74-
tlsSock.asInstanceOf[netMod.Socket].on_error(nodeStrings.error, errorListener)
75-
tlsSock
76-
}
77-
} { tlsSock =>
78-
F.delay {
79-
val eventEmitter = tlsSock.asInstanceOf[eventsMod.EventEmitter]
80-
eventEmitter.removeListener(
81-
"session",
82-
sessionListener.asInstanceOf[js.Function1[Any, Unit]]
83-
)
84-
eventEmitter.removeListener("error", errorListener.asInstanceOf[js.Function1[Any, Unit]])
85-
}
86-
}
87-
readable <- readReadableResource(
88-
F.delay(tlsSock.asInstanceOf[Readable]),
70+
tlsSockReadable <- suspendReadableAndRead(
8971
destroyIfNotEnded = false,
9072
destroyIfCanceled = false
91-
)
73+
) {
74+
val tlsSock = upgrade(duplex.asInstanceOf[streamMod.Duplex])
75+
tlsSock.on_session(nodeStrings.session, sessionListener)
76+
tlsSock.asInstanceOf[netMod.Socket].on_error(nodeStrings.error, errorListener)
77+
tlsSock.asInstanceOf[Readable]
78+
}
79+
.flatMap { case tlsSockReadable @ (tlsSock, _) =>
80+
Resource.pure(tlsSockReadable).onFinalize {
81+
F.delay {
82+
val eventEmitter = tlsSock.asInstanceOf[eventsMod.EventEmitter]
83+
eventEmitter.removeListener(
84+
"session",
85+
sessionListener.asInstanceOf[js.Function1[Any, Unit]]
86+
)
87+
eventEmitter
88+
.removeListener("error", errorListener.asInstanceOf[js.Function1[Any, Unit]])
89+
}
90+
}
91+
}
92+
(tlsSock, readable) = tlsSockReadable
9293
readStream <- SuspendedStream(
9394
readable
9495
).race(errorDef.get.flatMap(F.raiseError[SuspendedStream[F, Byte]]).toResource)
9596
.map(_.merge)
9697
} yield new AsyncTLSSocket(
97-
tlsSock,
98+
tlsSock.asInstanceOf[tlsMod.TLSSocket],
9899
readStream,
99100
sessionRef.discrete.unNone.head
100101
.concurrently(Stream.eval(errorDef.get.flatMap(F.raiseError[Unit])))

io/js/src/test/scala/fs2/io/IoPlatformSuite.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class IoPlatformSuite extends Fs2Suite {
3333
bytes
3434
.through(toReadable[IO])
3535
.flatMap { readable =>
36-
Stream.resource(readReadableResource(IO.pure(readable))).flatten
36+
Stream.resource(suspendReadableAndRead[IO, Readable]()(readable)).flatMap(_._2)
3737
}
3838
.compile
3939
.toVector
@@ -55,8 +55,8 @@ class IoPlatformSuite extends Fs2Suite {
5555
.through {
5656
toDuplexAndRead[IO] { duplex =>
5757
Stream
58-
.resource(readReadableResource[IO](IO.pure(duplex)))
59-
.flatten
58+
.resource(suspendReadableAndRead[IO, Duplex]()(duplex))
59+
.flatMap(_._2)
6060
.merge(bytes2.covary[IO].through(writeWritable[IO](IO.pure(duplex))))
6161
.compile
6262
.toVector

0 commit comments

Comments
 (0)