Skip to content

Commit 8a0df13

Browse files
committed
Fix site docs
1 parent 7395a77 commit 8a0df13

File tree

4 files changed

+37
-38
lines changed

4 files changed

+37
-38
lines changed

io/shared/src/main/scala/fs2/io/net/Network.scala

+1-5
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,7 @@ import fs2.io.net.tls.TLSContext
5151
* In this example, the `F[_]` parameter to `send` requires the `Network` constraint instead
5252
* of requiring the much more powerful `Async` constraint.
5353
*
54-
* The `Network` instance has a set of global resources used for managing sockets. Alternatively,
55-
* use the `socketGroup` and `datagramSocketGroup` operations to manage the lifecycle of underlying
56-
* resources.
57-
*
58-
* An instance of `Network` is available for any effect `F` which has an `Async[F]` instance.
54+
* An instance of `Network` is available for any effect `F` which has a `LiftIO[F]` instance.
5955
*/
6056
sealed trait Network[F[_]]
6157
extends NetworkPlatform[F]

site/concurrency-primitives.md

-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ The program ends after 15 seconds when the signal interrupts the publishing of m
147147

148148
```scala mdoc:silent
149149
import scala.concurrent.duration._
150-
import scala.language.higherKinds
151150
import cats.effect.std.Console
152151
import cats.effect.{Clock, IO, IOApp, Temporal}
153152
import cats.syntax.all._

site/io.md

+35-31
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@ import cats.syntax.all._
3333
import com.comcast.ip4s._
3434

3535
def client[F[_]: MonadCancelThrow: Console: Network]: F[Unit] =
36-
Network[F].client(SocketAddress(host"localhost", port"5555")).use { socket =>
36+
Network[F].connect(SocketAddress(host"localhost", port"5555")).use { socket =>
3737
socket.write(Chunk.array("Hello, world!".getBytes)) >>
3838
socket.read(8192).flatMap { response =>
3939
Console[F].println(s"Response: $response")
4040
}
4141
}
4242
```
4343

44-
To open a socket that's connected to `localhost:5555`, we use the `client` method on the `Network` capability. The `Network` capability provides the runtime environment for the sockets it creates.
44+
To open a socket that's connected to `localhost:5555`, we use the `connect` method on the `Network` capability. The `Network` capability provides the runtime environment for the sockets it creates.
4545

46-
The `Network[F].client` method returns a `Resource[F, Socket[F]]` which automatically closes the socket after the resource has been used. To write data to the socket, we call `socket.write`, which takes a `Chunk[Byte]` and returns an `F[Unit]`. Once the write completes, we do a single read from the socket via `socket.read`, passing the maximum amount of bytes we want to read. The returns an `F[Option[Chunk[Byte]]]` -- `None` if the socket reaches end of input and `Some` if the read produced a chunk. Finally, we print the response to the console.
46+
The `Network[F].connect` method returns a `Resource[F, Socket[F]]` which automatically closes the socket after the resource has been used. To write data to the socket, we call `socket.write`, which takes a `Chunk[Byte]` and returns an `F[Unit]`. Once the write completes, we do a single read from the socket via `socket.read`, passing the maximum amount of bytes we want to read. The returns an `F[Option[Chunk[Byte]]]` -- `None` if the socket reaches end of input and `Some` if the read produced a chunk. Finally, we print the response to the console.
4747

4848
Note we aren't doing any binary message framing or packetization in this example. Hence, it's very possible for the single read to only receive a portion of the original message -- perhaps just the bytes for `"Hello, w"`. We can use FS2 streams to simplify this. The `Socket` trait defines stream operations -- `writes` and `reads`. We could rewrite this example using the stream operations like so:
4949

@@ -56,7 +56,7 @@ import cats.syntax.all._
5656
import com.comcast.ip4s._
5757

5858
def client[F[_]: MonadCancelThrow: Console: Network]: Stream[F, Unit] =
59-
Stream.resource(Network[F].client(SocketAddress(host"localhost", port"5555"))).flatMap { socket =>
59+
Stream.resource(Network[F].connect(SocketAddress(host"localhost", port"5555"))).flatMap { socket =>
6060
Stream("Hello, world!")
6161
.through(text.utf8.encode)
6262
.through(socket.writes) ++
@@ -76,7 +76,7 @@ This program won't end until the server side closes the socket or indicates ther
7676

7777
```scala mdoc:nest
7878
def client[F[_]: MonadCancelThrow: Console: Network]: Stream[F, Unit] =
79-
Stream.resource(Network[F].client(SocketAddress(host"localhost", port"5555"))).flatMap { socket =>
79+
Stream.resource(Network[F].connect(SocketAddress(host"localhost", port"5555"))).flatMap { socket =>
8080
Stream("Hello, world!")
8181
.interleave(Stream.constant("\n"))
8282
.through(text.utf8.encode)
@@ -95,23 +95,23 @@ To update the write side, we added `.interleave(Stream.constant("\n"))` before d
9595

9696
#### Handling Connection Errors
9797

98-
If a TCP connection cannot be established, `socketGroup.client` fails with a `java.net.ConnectException`. To automatically attempt a reconnection, we can handle the `ConnectException` and try connecting again.
98+
If a TCP connection cannot be established, `connect` fails with a `fs2.io.net.ConnectException`. To automatically attempt a reconnection, we can handle the `ConnectException` and try connecting again.
9999

100100
```scala mdoc:nest
101101
import scala.concurrent.duration._
102102
import cats.effect.Temporal
103-
import fs2.io.net.Socket
104-
import java.net.ConnectException
103+
import fs2.io.net.{ConnectException, Socket}
105104

106-
def connect[F[_]: Temporal: Network](address: SocketAddress[Host]): Stream[F, Socket[F]] =
107-
Stream.resource(Network[F].client(address))
105+
def retryingConnect[F[_]: Temporal: Network](address: SocketAddress[Host]): Stream[F, Socket[F]] =
106+
Stream.resource(Network[F].connect(address))
108107
.handleErrorWith {
109108
case _: ConnectException =>
110-
connect(address).delayBy(5.seconds)
109+
retryingConnect(address).delayBy(5.seconds)
110+
case other => Stream.raiseError(other)
111111
}
112112

113113
def client[F[_]: Temporal: Console: Network]: Stream[F, Unit] =
114-
connect(SocketAddress(host"localhost", port"5555")).flatMap { socket =>
114+
retryingConnect(SocketAddress(host"localhost", port"5555")).flatMap { socket =>
115115
Stream("Hello, world!")
116116
.interleave(Stream.constant("\n"))
117117
.through(text.utf8.encode)
@@ -126,7 +126,7 @@ def client[F[_]: Temporal: Console: Network]: Stream[F, Unit] =
126126
}
127127
```
128128

129-
We've extracted the `Network[IO].client` call in to a new method called `connect`. The connect method attempts to create a client and handles the `ConnectException`. Upon encountering the exception, we call `connect` recursively after a 5 second delay. Because we are using `delayBy`, we needed to add a `Temporal` constraint to `F`. This same pattern could be used for more advanced retry strategies -- e.g., exponential delays and failing after a fixed number of attempts. Streams that call methods on `Socket` can fail with exceptions due to loss of the underlying TCP connection. Such exceptions can be handled in a similar manner.
129+
We've extracted the `Network[IO].connect` call in to a new method called `retryingConnect`. The `retryingConnect` method attempts to create a client and handles the `ConnectException`. Upon encountering the exception, we call `retryingConnect` recursively after a 5 second delay. Because we are using `delayBy`, we needed to add a `Temporal` constraint to `F`. This same pattern could be used for more advanced retry strategies -- e.g., exponential delays and failing after a fixed number of attempts. Streams that call methods on `Socket` can fail with exceptions due to loss of the underlying TCP connection. Such exceptions can be handled in a similar manner.
130130

131131
### Servers
132132

@@ -136,18 +136,20 @@ Now let's implement a server application that communicates with the client app w
136136
import cats.effect.Concurrent
137137

138138
def echoServer[F[_]: Concurrent: Network]: F[Unit] =
139-
Network[F].server(port = Some(port"5555")).map { client =>
140-
client.reads
141-
.through(text.utf8.decode)
142-
.through(text.lines)
143-
.interleave(Stream.constant("\n"))
144-
.through(text.utf8.encode)
145-
.through(client.writes)
146-
.handleErrorWith(_ => Stream.empty) // handle errors of client sockets
139+
Stream.resource(Network[F].bind(SocketAddress.port(port"5555"))).map { serverSocket =>
140+
serverSocket.accept.map { clientSocket =>
141+
clientSocket.reads
142+
.through(text.utf8.decode)
143+
.through(text.lines)
144+
.interleave(Stream.constant("\n"))
145+
.through(text.utf8.encode)
146+
.through(clientSocket.writes)
147+
.handleErrorWith(_ => Stream.empty) // handle errors of client sockets
148+
}
147149
}.parJoin(100).compile.drain
148150
```
149151

150-
We start with a call to `Network[IO].server` which returns a value of an interesting type -- `Stream[F, Socket[F]]`. This is an infinite stream of client sockets -- each time a client connects to the server, a `Socket[F]` is emitted, allowing interaction with that client. The lifetime of the client socket is managed by the overall stream -- e.g. flat mapping over a socket will keep that socket open until the returned inner stream completes, at which point, the client socket is closed and any underlying resources are returned to the runtime environment.
152+
We start with a call to `Network[IO].bind` which returns a value of type `Resource[F, ServerSocket[F]]`. The `ServerSocket` type defines an `accept` method of type `Stream[F, Socket[F]]`. This is an infinite stream of client sockets -- each time a client connects to the server, a `Socket[F]` is emitted, allowing interaction with that client. The lifetime of the client socket is managed by the overall stream -- e.g. flat mapping over a socket will keep that socket open until the returned inner stream completes, at which point, the client socket is closed and any underlying resources are returned to the runtime environment.
151153

152154
We map over this infinite stream of clients and provide the logic for handling an individual client. In this case,
153155
we read from the client socket, UTF-8 decode the received bytes, extract individual lines, and then write each line back to the client. This logic is implemented as a single `Stream[F, Unit]`.
@@ -156,7 +158,7 @@ Since we mapped over the infinite client stream, we end up with a `Stream[F, Str
156158

157159
In joining all these streams together, be prudent to handle errors in the client streams.
158160

159-
The pattern of `Network[F].server(address).map(handleClient).parJoin(maxConcurrentClients)` is very common when working with server sockets.
161+
The pattern of `Network[F].bind(address).map(ss => handleClient(ss.accept)).parJoin(maxConcurrentClients)` is very common when working with server sockets.
160162

161163
A simpler echo server could be implemented with this core logic:
162164

@@ -234,7 +236,7 @@ import com.comcast.ip4s._
234236

235237
def client[F[_]: MonadCancelThrow: Console: Network](
236238
tlsContext: TLSContext[F]): Stream[F, Unit] = {
237-
Stream.resource(Network[F].client(SocketAddress(host"localhost", port"5555"))).flatMap { underlyingSocket =>
239+
Stream.resource(Network[F].connect(SocketAddress(host"localhost", port"5555"))).flatMap { underlyingSocket =>
238240
Stream.resource(tlsContext.client(underlyingSocket)).flatMap { socket =>
239241
Stream("Hello, world!")
240242
.interleave(Stream.constant("\n"))
@@ -263,10 +265,10 @@ import fs2.io.net.tls.{TLSParameters, TLSSocket}
263265
import cats.effect.Resource
264266
import javax.net.ssl.SNIHostName
265267

266-
def tlsClientWithSni[F[_]: MonadCancelThrow: Network](
268+
def tlsClientWithSni[F[_]: Network](
267269
tlsContext: TLSContext[F],
268270
address: SocketAddress[Host]): Resource[F, TLSSocket[F]] =
269-
Network[F].client(address).flatMap { underlyingSocket =>
271+
Network[F].connect(address).flatMap { underlyingSocket =>
270272
tlsContext.clientBuilder(
271273
underlyingSocket
272274
).withParameters(
@@ -291,7 +293,7 @@ def debug[F[_]: MonadCancelThrow: Network](
291293
tlsContext: TLSContext[F],
292294
address: SocketAddress[Host]
293295
): F[String] =
294-
Network[F].client(address).use { underlyingSocket =>
296+
Network[F].connect(address).use { underlyingSocket =>
295297
tlsContext
296298
.clientBuilder(underlyingSocket)
297299
.withParameters(
@@ -316,13 +318,15 @@ The `fs2.io.file` package provides support for working with files. The README ex
316318

317319
```scala mdoc
318320
import cats.effect.Concurrent
319-
import fs2.{hash, text}
321+
import fs2.text
322+
import fs2.hashing.{Hashing, HashAlgorithm}
320323
import fs2.io.file.{Files, Path}
321324

322-
def writeDigest[F[_]: Files: Concurrent](path: Path): F[Path] = {
325+
def writeDigest[F[_]: Files: Hashing: Concurrent](path: Path): F[Path] = {
323326
val target = Path(path.toString + ".sha256")
324327
Files[F].readAll(path)
325-
.through(hash.sha256)
328+
.through(Hashing[F].hash(HashAlgorithm.SHA256))
329+
.flatMap(h => Stream.chunk(h.bytes))
326330
.through(text.hex.encode)
327331
.through(text.utf8.encode)
328332
.through(Files[F].writeAll(target))
@@ -436,4 +440,4 @@ The `fs2.io.writeOutputStream` method provides a pipe that writes the bytes emit
436440
The `fs2.io.readOutputStream` method creates a `Stream[F, Byte]` from a function which writes to an `OutputStream`.
437441

438442
[s2n-tls]: https://github.com/aws/s2n-tls
439-
[`node:tls` module]: https://nodejs.org/api/tls.html
443+
[`node:tls` module]: https://nodejs.org/api/tls.html

site/timeseries.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Our `withBitrate` combinator requires a `Stream[F, TimeStamped[ByteVector]]` arg
2525

2626
```scala mdoc
2727
def withReceivedBitrate[F[_]](input: Stream[F, Byte]): Stream[F, TimeStamped[Either[Long, ByteVector]]] =
28-
input.chunks.map(c => TimeStamped.unsafeNow(c.toByteVector)).through(withBitrate)
28+
input.chunks.map(c => TimeStamped.unsafeMonotonic(c.toByteVector)).through(withBitrate)
2929
```
3030

3131
Each emitted sample is the sum of bits received during each one second period. Let's compute an average of that value over the last 10 seconds. We can do this via `mapAccumulate` along with a `scala.collection.immutable.Queue`:

0 commit comments

Comments
 (0)