Skip to content

Commit fa79c60

Browse files
committed
deploy: 6a5d774
1 parent f556452 commit fa79c60

File tree

4 files changed

+63
-63
lines changed

4 files changed

+63
-63
lines changed

concurrency-primitives.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ The program ends after 15 seconds when the signal interrupts the publishing of m
147147

148148
```scala
149149
import scala.concurrent.duration._
150-
import scala.language.higherKinds
151150
import cats.effect.std.Console
152151
import cats.effect.{Clock, IO, IOApp, Temporal}
153152
import cats.syntax.all._

guide.md

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -718,10 +718,10 @@ val program =
718718
// program: Stream[[x]IO[x], Unit] = Stream(..)
719719

720720
program.compile.drain.unsafeRunSync()
721-
// 10:59:33.802451043
722-
// 10:59:34.802421143
723-
// 10:59:35.802444230
724-
// 10:59:36.802461152
721+
// 10:58:08.001470869
722+
// 10:58:09.001363373
723+
// 10:58:10.001378136
724+
// 10:58:11.001402676
725725
```
726726

727727
Let's take this line by line now, so we can understand what's going on.
@@ -763,10 +763,10 @@ val program1 =
763763
// program1: Stream[[x]IO[x], Unit] = Stream(..)
764764

765765
program1.compile.drain.unsafeRunSync()
766-
// 10:59:38.804607869
767-
// 10:59:39.804606156
768-
// 10:59:40.804651955
769-
// 10:59:41.804603916
766+
// 10:58:13.003652718
767+
// 10:58:14.003658123
768+
// 10:58:15.003649207
769+
// 10:58:16.003731218
770770
```
771771

772772
### Talking to the external world
@@ -799,7 +799,7 @@ The way you bring synchronous effects into your effect type may differ. `Sync.de
799799
import cats.effect.Sync
800800

801801
val T = Sync[IO]
802-
// T: cats.effect.kernel.Async[IO] = cats.effect.IO$IOAsync@550f3c76
802+
// T: cats.effect.kernel.Async[IO] = cats.effect.IO$IOAsync@7718ecfa
803803
val s2 = Stream.exec(T.delay { destroyUniverse() }) ++ Stream("...moving on")
804804
// s2: Stream[[x]IO[x], String] = Stream(..)
805805
s2.compile.toVector.unsafeRunSync()
@@ -933,15 +933,15 @@ stream.toUnicastPublisher
933933
// source = Bind(
934934
// source = Eval(
935935
// fa = Delay(
936-
// thunk = cats.effect.IO$$$Lambda$12222/0x00007fd2c8053540@582ad521,
936+
// thunk = cats.effect.IO$$$Lambda$11188/0x00007ff69be8b940@b90cd45,
937937
// event = cats.effect.tracing.TracingEvent$StackTrace
938938
// )
939939
// ),
940-
// fs = cats.effect.std.Supervisor$$$Lambda$13139/0x00007fd2c825b020@4399bd4
940+
// fs = cats.effect.std.Supervisor$$$Lambda$12109/0x00007ff69c0b4c30@23ee6cbf
941941
// ),
942-
// fs = cats.effect.std.Dispatcher$$$Lambda$13140/0x00007fd2c825b3f0@3c7d2e9b
942+
// fs = cats.effect.std.Dispatcher$$$Lambda$12110/0x00007ff69c0b5000@39287eea
943943
// ),
944-
// fs = cats.effect.kernel.Resource$$Lambda$13082/0x00007fd2c8242000@2b6215b1
944+
// fs = cats.effect.kernel.Resource$$Lambda$12052/0x00007ff69c0985d0@3a2568dd
945945
// )
946946
```
947947

@@ -954,25 +954,25 @@ val publisher: Resource[IO, StreamUnicastPublisher[IO, Int]] = Stream(1, 2, 3).c
954954
// source = Bind(
955955
// source = Eval(
956956
// fa = Delay(
957-
// thunk = cats.effect.IO$$$Lambda$12222/0x00007fd2c8053540@25ef5e21,
957+
// thunk = cats.effect.IO$$$Lambda$11188/0x00007ff69be8b940@37d26039,
958958
// event = cats.effect.tracing.TracingEvent$StackTrace
959959
// )
960960
// ),
961-
// fs = cats.effect.std.Supervisor$$$Lambda$13139/0x00007fd2c825b020@6dc09673
961+
// fs = cats.effect.std.Supervisor$$$Lambda$12109/0x00007ff69c0b4c30@678969e8
962962
// ),
963-
// fs = cats.effect.std.Dispatcher$$$Lambda$13140/0x00007fd2c825b3f0@389c64a6
963+
// fs = cats.effect.std.Dispatcher$$$Lambda$12110/0x00007ff69c0b5000@1d2c94dd
964964
// ),
965-
// fs = cats.effect.kernel.Resource$$Lambda$13082/0x00007fd2c8242000@3e83199
965+
// fs = cats.effect.kernel.Resource$$Lambda$12052/0x00007ff69c0985d0@cde7425
966966
// )
967967
publisher.use { p =>
968968
p.toStream[IO].compile.toList
969969
}
970970
// res59: IO[List[Int]] = FlatMap(
971971
// ioe = Delay(
972-
// thunk = cats.effect.IO$$$Lambda$12222/0x00007fd2c8053540@25ef5e21,
972+
// thunk = cats.effect.IO$$$Lambda$11188/0x00007ff69be8b940@37d26039,
973973
// event = cats.effect.tracing.TracingEvent$StackTrace
974974
// ),
975-
// f = cats.effect.kernel.Resource$$Lambda$13142/0x00007fd2c8260648@16e06d35,
975+
// f = cats.effect.kernel.Resource$$Lambda$12112/0x00007ff69c0b6240@7977c414,
976976
// event = cats.effect.tracing.TracingEvent$StackTrace
977977
// )
978978
```

io.md

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -25,38 +25,37 @@ The `fs2.io.net.Socket` trait provides mechanisms for reading and writing data -
2525
To get started, let's write a client program that connects to a server, sends a message, and reads a response.
2626

2727
```scala
28-
import fs2.{Chunk, Stream}
28+
import fs2.Chunk
2929
import fs2.io.net.Network
3030
import cats.effect.MonadCancelThrow
3131
import cats.effect.std.Console
3232
import cats.syntax.all._
3333
import com.comcast.ip4s._
3434

3535
def client[F[_]: MonadCancelThrow: Console: Network]: F[Unit] =
36-
Network[F].client(SocketAddress(host"localhost", port"5555")).use { socket =>
36+
Network[F].connect(SocketAddress(host"localhost", port"5555")).use { socket =>
3737
socket.write(Chunk.array("Hello, world!".getBytes)) >>
3838
socket.read(8192).flatMap { response =>
3939
Console[F].println(s"Response: $response")
4040
}
4141
}
4242
```
4343

44-
To open a socket that's connected to `localhost:5555`, we use the `client` method on the `Network` capability. The `Network` capability provides the runtime environment for the sockets it creates.
44+
To open a socket that's connected to `localhost:5555`, we use the `connect` method on the `Network` capability. The `Network` capability provides the runtime environment for the sockets it creates.
4545

46-
The `Network[F].client` method returns a `Resource[F, Socket[F]]` which automatically closes the socket after the resource has been used. To write data to the socket, we call `socket.write`, which takes a `Chunk[Byte]` and returns an `F[Unit]`. Once the write completes, we do a single read from the socket via `socket.read`, passing the maximum amount of bytes we want to read. The returns an `F[Option[Chunk[Byte]]]` -- `None` if the socket reaches end of input and `Some` if the read produced a chunk. Finally, we print the response to the console.
46+
The `Network[F].connect` method returns a `Resource[F, Socket[F]]` which automatically closes the socket after the resource has been used. To write data to the socket, we call `socket.write`, which takes a `Chunk[Byte]` and returns an `F[Unit]`. Once the write completes, we do a single read from the socket via `socket.read`, passing the maximum amount of bytes we want to read. The returns an `F[Option[Chunk[Byte]]]` -- `None` if the socket reaches end of input and `Some` if the read produced a chunk. Finally, we print the response to the console.
4747

4848
Note we aren't doing any binary message framing or packetization in this example. Hence, it's very possible for the single read to only receive a portion of the original message -- perhaps just the bytes for `"Hello, w"`. We can use FS2 streams to simplify this. The `Socket` trait defines stream operations -- `writes` and `reads`. We could rewrite this example using the stream operations like so:
4949

5050
```scala
51-
import fs2.{Chunk, Stream, text}
51+
import fs2.{Stream, text}
5252
import fs2.io.net.Network
5353
import cats.effect.MonadCancelThrow
5454
import cats.effect.std.Console
55-
import cats.syntax.all._
5655
import com.comcast.ip4s._
5756

5857
def client[F[_]: MonadCancelThrow: Console: Network]: Stream[F, Unit] =
59-
Stream.resource(Network[F].client(SocketAddress(host"localhost", port"5555"))).flatMap { socket =>
58+
Stream.resource(Network[F].connect(SocketAddress(host"localhost", port"5555"))).flatMap { socket =>
6059
Stream("Hello, world!")
6160
.through(text.utf8.encode)
6261
.through(socket.writes) ++
@@ -76,7 +75,7 @@ This program won't end until the server side closes the socket or indicates ther
7675

7776
```scala
7877
def client[F[_]: MonadCancelThrow: Console: Network]: Stream[F, Unit] =
79-
Stream.resource(Network[F].client(SocketAddress(host"localhost", port"5555"))).flatMap { socket =>
78+
Stream.resource(Network[F].connect(SocketAddress(host"localhost", port"5555"))).flatMap { socket =>
8079
Stream("Hello, world!")
8180
.interleave(Stream.constant("\n"))
8281
.through(text.utf8.encode)
@@ -95,23 +94,23 @@ To update the write side, we added `.interleave(Stream.constant("\n"))` before d
9594

9695
#### Handling Connection Errors
9796

98-
If a TCP connection cannot be established, `socketGroup.client` fails with a `java.net.ConnectException`. To automatically attempt a reconnection, we can handle the `ConnectException` and try connecting again.
97+
If a TCP connection cannot be established, `connect` fails with a `fs2.io.net.ConnectException`. To automatically attempt a reconnection, we can handle the `ConnectException` and try connecting again.
9998

10099
```scala
101100
import scala.concurrent.duration._
102101
import cats.effect.Temporal
103-
import fs2.io.net.Socket
104-
import java.net.ConnectException
102+
import fs2.io.net.{ConnectException, Socket}
105103

106-
def connect[F[_]: Temporal: Network](address: SocketAddress[Host]): Stream[F, Socket[F]] =
107-
Stream.resource(Network[F].client(address))
104+
def retryingConnect[F[_]: Temporal: Network](address: SocketAddress[Host]): Stream[F, Socket[F]] =
105+
Stream.resource(Network[F].connect(address))
108106
.handleErrorWith {
109107
case _: ConnectException =>
110-
connect(address).delayBy(5.seconds)
108+
retryingConnect(address).delayBy(5.seconds)
109+
case other => Stream.raiseError(other)
111110
}
112111

113112
def client[F[_]: Temporal: Console: Network]: Stream[F, Unit] =
114-
connect(SocketAddress(host"localhost", port"5555")).flatMap { socket =>
113+
retryingConnect(SocketAddress(host"localhost", port"5555")).flatMap { socket =>
115114
Stream("Hello, world!")
116115
.interleave(Stream.constant("\n"))
117116
.through(text.utf8.encode)
@@ -126,7 +125,7 @@ def client[F[_]: Temporal: Console: Network]: Stream[F, Unit] =
126125
}
127126
```
128127

129-
We've extracted the `Network[IO].client` call in to a new method called `connect`. The connect method attempts to create a client and handles the `ConnectException`. Upon encountering the exception, we call `connect` recursively after a 5 second delay. Because we are using `delayBy`, we needed to add a `Temporal` constraint to `F`. This same pattern could be used for more advanced retry strategies -- e.g., exponential delays and failing after a fixed number of attempts. Streams that call methods on `Socket` can fail with exceptions due to loss of the underlying TCP connection. Such exceptions can be handled in a similar manner.
128+
We've extracted the `Network[IO].connect` call in to a new method called `retryingConnect`. The `retryingConnect` method attempts to create a client and handles the `ConnectException`. Upon encountering the exception, we call `retryingConnect` recursively after a 5 second delay. Because we are using `delayBy`, we needed to add a `Temporal` constraint to `F`. This same pattern could be used for more advanced retry strategies -- e.g., exponential delays and failing after a fixed number of attempts. Streams that call methods on `Socket` can fail with exceptions due to loss of the underlying TCP connection. Such exceptions can be handled in a similar manner.
130129

131130
### Servers
132131

@@ -136,18 +135,20 @@ Now let's implement a server application that communicates with the client app w
136135
import cats.effect.Concurrent
137136

138137
def echoServer[F[_]: Concurrent: Network]: F[Unit] =
139-
Network[F].server(port = Some(port"5555")).map { client =>
140-
client.reads
141-
.through(text.utf8.decode)
142-
.through(text.lines)
143-
.interleave(Stream.constant("\n"))
144-
.through(text.utf8.encode)
145-
.through(client.writes)
146-
.handleErrorWith(_ => Stream.empty) // handle errors of client sockets
138+
Stream.resource(Network[F].bind(SocketAddress.port(port"5555"))).map { serverSocket =>
139+
serverSocket.accept.map { clientSocket =>
140+
clientSocket.reads
141+
.through(text.utf8.decode)
142+
.through(text.lines)
143+
.interleave(Stream.constant("\n"))
144+
.through(text.utf8.encode)
145+
.through(clientSocket.writes)
146+
.handleErrorWith(_ => Stream.empty) // handle errors of client sockets
147+
}
147148
}.parJoin(100).compile.drain
148149
```
149150

150-
We start with a call to `Network[IO].server` which returns a value of an interesting type -- `Stream[F, Socket[F]]`. This is an infinite stream of client sockets -- each time a client connects to the server, a `Socket[F]` is emitted, allowing interaction with that client. The lifetime of the client socket is managed by the overall stream -- e.g. flat mapping over a socket will keep that socket open until the returned inner stream completes, at which point, the client socket is closed and any underlying resources are returned to the runtime environment.
151+
We start with a call to `Network[IO].bind` which returns a value of type `Resource[F, ServerSocket[F]]`. The `ServerSocket` type defines an `accept` method of type `Stream[F, Socket[F]]`. This is an infinite stream of client sockets -- each time a client connects to the server, a `Socket[F]` is emitted, allowing interaction with that client. The lifetime of the client socket is managed by the overall stream -- e.g. flat mapping over a socket will keep that socket open until the returned inner stream completes, at which point, the client socket is closed and any underlying resources are returned to the runtime environment.
151152

152153
We map over this infinite stream of clients and provide the logic for handling an individual client. In this case,
153154
we read from the client socket, UTF-8 decode the received bytes, extract individual lines, and then write each line back to the client. This logic is implemented as a single `Stream[F, Unit]`.
@@ -156,7 +157,7 @@ Since we mapped over the infinite client stream, we end up with a `Stream[F, Str
156157

157158
In joining all these streams together, be prudent to handle errors in the client streams.
158159

159-
The pattern of `Network[F].server(address).map(handleClient).parJoin(maxConcurrentClients)` is very common when working with server sockets.
160+
The pattern of `Network[F].bind(address).map(ss => handleClient(ss.accept)).parJoin(maxConcurrentClients)` is very common when working with server sockets.
160161

161162
A simpler echo server could be implemented with this core logic:
162163

@@ -170,26 +171,24 @@ The [fs2-chat](https://github.com/functional-streams-for-scala/fs2-chat) sample
170171

171172
## UDP
172173

173-
UDP support works much the same way as TCP. The `fs2.io.net.DatagramSocket` trait provides mechanisms for reading and writing UDP datagrams. UDP sockets are created via the `openDatagramSocket` method on `fs2.io.net.Network`. Unlike TCP, there's no differentiation between client and server sockets. Additionally, since UDP is a packet based protocol, read and write operations use `fs2.io.net.Datagram` values, which consist of a `Chunk[Byte]` and a `SocketAddress[IpAddress]`.
174+
UDP support works much the same way as TCP. The `fs2.io.net.DatagramSocket` trait provides mechanisms for reading and writing UDP datagrams. UDP sockets are created via the `bindDatagramSocket` method on `fs2.io.net.Network`. Unlike TCP, there's no differentiation between client and server sockets.
174175

175176
Adapting the TCP client example for UDP gives us the following:
176177

177178
```scala
178179
import fs2.{Stream, text}
179-
import fs2.io.net.{Datagram, Network}
180+
import fs2.io.net.Network
180181
import cats.effect.Concurrent
181182
import cats.effect.std.Console
182183
import com.comcast.ip4s._
183184

184185
def client[F[_]: Concurrent: Console: Network]: F[Unit] = {
185186
val address = SocketAddress(ip"127.0.0.1", port"5555")
186-
Stream.resource(Network[F].openDatagramSocket()).flatMap { socket =>
187+
Stream.resource(Network[F].bindDatagramSocket()).flatMap { socket =>
187188
Stream("Hello, world!")
188189
.through(text.utf8.encode)
189190
.chunks
190-
.map(data => Datagram(address, data))
191-
.through(socket.writes)
192-
.drain ++
191+
.evalTap(data => socket.write(data, address)) ++
193192
socket.reads
194193
.flatMap(datagram => Stream.chunk(datagram.bytes))
195194
.through(text.utf8.decode)
@@ -200,11 +199,11 @@ def client[F[_]: Concurrent: Console: Network]: F[Unit] = {
200199
}
201200
```
202201

203-
When writing, we map each chunk of bytes to a `Datagram`, which includes the destination address of the packet. When reading, we convert the `Stream[F, Datagram]` to a `Stream[F, Byte]` via `flatMap(datagram => Stream.chunk(datagram.bytes))`. Otherwise, the example is unchanged.
202+
We call `socket.write`, supplying a chunk of bytes and the destination address. When reading, we convert the `Stream[F, Datagram]` to a `Stream[F, Byte]` via `flatMap(datagram => Stream.chunk(datagram.bytes))`. Otherwise, the example is unchanged.
204203

205204
```scala
206205
def echoServer[F[_]: Concurrent: Network]: F[Unit] =
207-
Stream.resource(Network[F].openDatagramSocket(port = Some(port"5555"))).flatMap { socket =>
206+
Stream.resource(Network[F].bindDatagramSocket(SocketAddress.port(port"5555"))).flatMap { socket =>
208207
socket.reads.through(socket.writes)
209208
}.compile.drain
210209
```
@@ -234,7 +233,7 @@ import com.comcast.ip4s._
234233

235234
def client[F[_]: MonadCancelThrow: Console: Network](
236235
tlsContext: TLSContext[F]): Stream[F, Unit] = {
237-
Stream.resource(Network[F].client(SocketAddress(host"localhost", port"5555"))).flatMap { underlyingSocket =>
236+
Stream.resource(Network[F].connect(SocketAddress(host"localhost", port"5555"))).flatMap { underlyingSocket =>
238237
Stream.resource(tlsContext.client(underlyingSocket)).flatMap { socket =>
239238
Stream("Hello, world!")
240239
.interleave(Stream.constant("\n"))
@@ -263,10 +262,10 @@ import fs2.io.net.tls.{TLSParameters, TLSSocket}
263262
import cats.effect.Resource
264263
import javax.net.ssl.SNIHostName
265264

266-
def tlsClientWithSni[F[_]: MonadCancelThrow: Network](
265+
def tlsClientWithSni[F[_]: Network](
267266
tlsContext: TLSContext[F],
268267
address: SocketAddress[Host]): Resource[F, TLSSocket[F]] =
269-
Network[F].client(address).flatMap { underlyingSocket =>
268+
Network[F].connect(address).flatMap { underlyingSocket =>
270269
tlsContext.clientBuilder(
271270
underlyingSocket
272271
).withParameters(
@@ -291,7 +290,7 @@ def debug[F[_]: MonadCancelThrow: Network](
291290
tlsContext: TLSContext[F],
292291
address: SocketAddress[Host]
293292
): F[String] =
294-
Network[F].client(address).use { underlyingSocket =>
293+
Network[F].connect(address).use { underlyingSocket =>
295294
tlsContext
296295
.clientBuilder(underlyingSocket)
297296
.withParameters(
@@ -316,13 +315,15 @@ The `fs2.io.file` package provides support for working with files. The README ex
316315

317316
```scala
318317
import cats.effect.Concurrent
319-
import fs2.{hash, text}
318+
import fs2.text
319+
import fs2.hashing.{Hashing, HashAlgorithm}
320320
import fs2.io.file.{Files, Path}
321321

322-
def writeDigest[F[_]: Files: Concurrent](path: Path): F[Path] = {
322+
def writeDigest[F[_]: Files: Hashing: Concurrent](path: Path): F[Path] = {
323323
val target = Path(path.toString + ".sha256")
324324
Files[F].readAll(path)
325-
.through(hash.sha256)
325+
.through(Hashing[F].hash(HashAlgorithm.SHA256))
326+
.flatMap(h => Stream.chunk(h.bytes))
326327
.through(text.hex.encode)
327328
.through(text.utf8.encode)
328329
.through(Files[F].writeAll(target))
@@ -436,4 +437,4 @@ The `fs2.io.writeOutputStream` method provides a pipe that writes the bytes emit
436437
The `fs2.io.readOutputStream` method creates a `Stream[F, Byte]` from a function which writes to an `OutputStream`.
437438

438439
[s2n-tls]: https://github.com/aws/s2n-tls
439-
[`node:tls` module]: https://nodejs.org/api/tls.html
440+
[`node:tls` module]: https://nodejs.org/api/tls.html

timeseries.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Our `withBitrate` combinator requires a `Stream[F, TimeStamped[ByteVector]]` arg
2525

2626
```scala
2727
def withReceivedBitrate[F[_]](input: Stream[F, Byte]): Stream[F, TimeStamped[Either[Long, ByteVector]]] =
28-
input.chunks.map(c => TimeStamped.unsafeNow(c.toByteVector)).through(withBitrate)
28+
input.chunks.map(c => TimeStamped.unsafeMonotonic(c.toByteVector)).through(withBitrate)
2929
```
3030

3131
Each emitted sample is the sum of bits received during each one second period. Let's compute an average of that value over the last 10 seconds. We can do this via `mapAccumulate` along with a `scala.collection.immutable.Queue`:

0 commit comments

Comments
 (0)