Skip to content

Commit d423708

Browse files
authored
Merge pull request #417 from fd4s/ce-3
Port to cats-effect 3.0
2 parents cec73ce + 31a4ab6 commit d423708

54 files changed

Lines changed: 385 additions & 506 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build.sbt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
val catsEffectVersion = "2.3.1"
1+
val catsEffectVersion = "3.0.0-RC2"
22

33
val catsVersion = "2.4.1"
44

55
val confluentVersion = "6.1.0"
66

7-
val fs2Version = "2.5.0"
7+
val fs2Version = "3.0.0-M8"
88

99
val kafkaVersion = "2.7.0"
1010

@@ -36,6 +36,7 @@ lazy val core = project
3636
name := moduleName.value,
3737
dependencySettings ++ Seq(
3838
libraryDependencies ++= Seq(
39+
"org.typelevel" %% "cats-effect" % catsEffectVersion,
3940
"co.fs2" %% "fs2-core" % fs2Version,
4041
"org.apache.kafka" % "kafka-clients" % kafkaVersion
4142
)
@@ -87,6 +88,7 @@ lazy val dependencySettings = Seq(
8788
.withDottyCompat(scalaVersion.value),
8889
"org.typelevel" %% "discipline-scalatest" % "2.1.1",
8990
"org.typelevel" %% "cats-effect-laws" % catsEffectVersion,
91+
"org.typelevel" %% "cats-effect-testkit" % catsEffectVersion,
9092
"org.typelevel" %% "cats-testkit-scalatest" % "2.1.1",
9193
"ch.qos.logback" % "logback-classic" % "1.2.3"
9294
).map(_ % Test),
@@ -226,7 +228,7 @@ lazy val publishSettings =
226228
)
227229

228230
lazy val mimaSettings = Seq(
229-
// Restore this after releasing v2.0.0
231+
// Restore this after releasing v3.0.0
230232
// mimaPreviousArtifacts := {
231233
// if (publishArtifact.value) {
232234
// Set(organization.value %% moduleName.value % (previousStableVersion in ThisBuild).value.get)

docs/src/main/mdoc/admin.md

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ def adminClientSettings[F[_]: Sync](bootstrapServers: String): AdminClientSettin
2828

2929
There are several settings specific to the library.
3030

31-
- `withBlocker` sets the `Blocker` on which blocking Java Kafka `AdminClient` functions are executed. Unless specified, a default fixed single-thread pool is created as part of admin client initialization, with the thread name using the `fs2-kafka-admin-client` prefix.
32-
3331
- `withCloseTimeout` controls the timeout when waiting for admin client shutdown. Default is 20 seconds.
3432

3533
- `withCreateAdminClient` changes how the underlying Java Kafka admin client is created. The default creates a Java `AdminClient` instance using set properties, but this function allows overriding the behaviour for e.g. testing purposes.
@@ -39,7 +37,7 @@ There are several settings specific to the library.
3937
Once settings are defined, we can use create an admin client in a `Stream`.
4038

4139
```scala mdoc:silent
42-
def kafkaAdminClientStream[F[_]: Concurrent: ContextShift](
40+
def kafkaAdminClientStream[F[_]: Async](
4341
bootstrapServers: String
4442
): Stream[F, KafkaAdminClient[F]] =
4543
KafkaAdminClient.stream(adminClientSettings[F](bootstrapServers))
@@ -48,7 +46,7 @@ def kafkaAdminClientStream[F[_]: Concurrent: ContextShift](
4846
Alternatively, we can create an admin client in a `Resource` context.
4947

5048
```scala mdoc:silent
51-
def kafkaAdminClientResource[F[_]: Concurrent: ContextShift](
49+
def kafkaAdminClientResource[F[_]: Async](
5250
bootstrapServers: String
5351
): Resource[F, KafkaAdminClient[F]] =
5452
KafkaAdminClient.resource(adminClientSettings[F](bootstrapServers))
@@ -61,7 +59,7 @@ There are functions available for describing, creating, and deleting topics.
6159
```scala mdoc:silent
6260
import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
6361

64-
def topicOperations[F[_]: Concurrent: ContextShift]: F[Unit] =
62+
def topicOperations[F[_]: Async]: F[Unit] =
6563
kafkaAdminClientResource[F]("localhost:9092").use { client =>
6664
for {
6765
topicNames <- client.listTopics.names
@@ -83,7 +81,7 @@ We can edit the configuration of different resources, like topics and nodes.
8381
import org.apache.kafka.common.config.ConfigResource
8482
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
8583

86-
def configOperations[F[_]: Concurrent: ContextShift]: F[Unit] =
84+
def configOperations[F[_]: Async]: F[Unit] =
8785
kafkaAdminClientResource[F]("localhost:9092").use { client =>
8886
val topic = new ConfigResource(ConfigResource.Type.TOPIC, "topic")
8987

@@ -108,7 +106,7 @@ It's possible to retrieve metadata about the cluster nodes.
108106
```scala mdoc:silent
109107
import org.apache.kafka.common.Node
110108

111-
def clusterNodes[F[_]: Concurrent: ContextShift]: F[Set[Node]] =
109+
def clusterNodes[F[_]: Async]: F[Set[Node]] =
112110
kafkaAdminClientResource[F]("localhost:9092").use(_.describeCluster.nodes)
113111
```
114112

@@ -117,9 +115,7 @@ def clusterNodes[F[_]: Concurrent: ContextShift]: F[Set[Node]] =
117115
There are functions available for working with consumer groups.
118116

119117
```scala mdoc:silent
120-
import cats.Parallel
121-
122-
def consumerGroupOperations[F[_]: Concurrent: ContextShift: Parallel]: F[Unit] =
118+
def consumerGroupOperations[F[_]: Async: cats.Parallel]: F[Unit] =
123119
kafkaAdminClientResource[F]("localhost:9092").use { client =>
124120
for {
125121
consumerGroupIds <- client.listConsumerGroups.groupIds
@@ -145,7 +141,7 @@ import org.apache.kafka.common.resource.{
145141
ResourceType
146142
}
147143

148-
def aclOperations[F[_]: Concurrent: ContextShift]: F[Unit] =
144+
def aclOperations[F[_]: Async]: F[Unit] =
149145
kafkaAdminClientResource[F]("localhost:9092").use { client =>
150146
for {
151147
describedAcls <- client.describeAcls(AclBindingFilter.ANY)

docs/src/main/mdoc/consumers.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,6 @@ In addition, there are several settings specific to the library.
144144

145145
- `withCreateConsumer` changes how the underlying Java Kafka consumer is created. The default merely creates a Java `KafkaConsumer` instance using set properties, but this function allows overriding the behaviour for e.g. testing purposes.
146146

147-
- `withBlocker` sets the `Blocker` on which blocking Java Kafka consumer functions are executed. Unless specified, a default fixed single-thread pool is created as part of consumer initialization, with the thread name using the `fs2-kafka-consumer` prefix.
148-
149147
- `withMaxPrefetchBatches` adjusts the maximum number of record batches per topic-partition to prefetch before backpressure is applied. The default is 2, meaning there can be up to 2 record batches per topic-partition waiting to be processed.
150148

151149
- `withPollInterval` alters how often consumer `poll` should take place. Default is 50 milliseconds.
@@ -344,7 +342,7 @@ To achieve this behavior we could use a `stopConsuming` method on a` KafkaConsum
344342
We could combine `stopConsuming` with the custom resource handling and implement a graceful shutdown. Let's try it:
345343

346344
```scala mdoc:silent
347-
import cats.effect.concurrent.{Deferred, Ref}
345+
import cats.effect.{Deferred, Ref}
348346

349347
object WithGracefulShutdownExample extends IOApp {
350348
def run(args: List[String]): IO[ExitCode] = {
@@ -372,7 +370,7 @@ object WithGracefulShutdownExample extends IOApp {
372370
}.uncancelable // [7]
373371
} { case ((consumer, closeConsumer), exitCase) => // [8]
374372
(exitCase match {
375-
case ExitCase.Error(e) => handleError(e) // [9]
373+
case Outcome.Errored(e) => handleError(e) // [9]
376374
case _ => for {
377375
_ <- gracefulShutdownStartedRef.set(true) // [10]
378376
_ <- consumer.stopConsuming // [11]

docs/src/main/mdoc/producers.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,6 @@ The following settings are specific to the library.
134134

135135
- `withCreateProducer` changes how the underlying Java Kafka producer is created. The default merely creates a Java `KafkaProducer` instance using set properties, but this function allows overriding the behaviour for e.g. testing purposes.
136136

137-
- `withBlocker` sets the `Blocker` on which blocking Java Kafka producer functions are executed. Unless specified, a default fixed single-thread pool is created as part of producer initialization, with the thread name using the `fs2-kafka-producer` prefix.
138-
139137
## Producer Creation
140138

141139
Once [`ProducerSettings`][producersettings] is defined, use `KafkaProducer.stream` to create a [`KafkaProducer`][kafkaproducer] instance.

modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
package fs2.kafka
88

9-
import cats.effect.{Blocker, Sync}
9+
import cats.effect.Sync
1010
import cats.Show
1111
import fs2.kafka.internal.converters.collection._
1212
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
@@ -27,19 +27,6 @@ import scala.concurrent.duration._
2727
*/
2828
sealed abstract class AdminClientSettings[F[_]] {
2929

30-
/**
31-
* The `Blocker` to use for blocking Kafka operations. If not
32-
* explicitly provided, a default `Blocker` will be created
33-
* when creating a `KafkaAdminClient` instance.
34-
*/
35-
def blocker: Option[Blocker]
36-
37-
/**
38-
* Returns a new [[AdminClientSettings]] instance with the
39-
* specified [[blocker]] to use for blocking operations.
40-
*/
41-
def withBlocker(blocker: Blocker): AdminClientSettings[F]
42-
4330
/**
4431
* Properties which can be provided when creating a Java `KafkaAdminClient`
4532
* instance. Numerous functions in [[AdminClientSettings]] add properties
@@ -205,13 +192,10 @@ sealed abstract class AdminClientSettings[F[_]] {
205192

206193
object AdminClientSettings {
207194
private[this] final case class AdminClientSettingsImpl[F[_]](
208-
override val blocker: Option[Blocker],
209195
override val properties: Map[String, String],
210196
override val closeTimeout: FiniteDuration,
211197
val createAdminClientWith: Map[String, String] => F[AdminClient]
212198
) extends AdminClientSettings[F] {
213-
override def withBlocker(blocker: Blocker): AdminClientSettings[F] =
214-
copy(blocker = Some(blocker))
215199

216200
override def withBootstrapServers(bootstrapServers: String): AdminClientSettings[F] =
217201
withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
@@ -279,11 +263,10 @@ object AdminClientSettings {
279263

280264
def apply[F[_]](implicit F: Sync[F]): AdminClientSettings[F] =
281265
AdminClientSettingsImpl(
282-
blocker = None,
283266
properties = Map.empty,
284267
closeTimeout = 20.seconds,
285268
createAdminClientWith = properties =>
286-
F.delay {
269+
F.blocking {
287270
AdminClient.create {
288271
(properties: Map[String, AnyRef]).asJava
289272
}

modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66

77
package fs2.kafka
88

9-
import cats.effect.Timer
9+
import cats.effect.Temporal
1010
import cats.syntax.applicativeError._
1111
import cats.syntax.flatMap._
1212
import cats.syntax.functor._
13-
import cats.{Functor, MonadError}
13+
import cats.Functor
1414
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RetriableCommitFailedException}
1515
import org.apache.kafka.common.TopicPartition
1616

@@ -44,9 +44,8 @@ abstract class CommitRecovery {
4444
offsets: Map[TopicPartition, OffsetAndMetadata],
4545
commit: F[Unit]
4646
)(
47-
implicit F: MonadError[F, Throwable],
48-
jitter: Jitter[F],
49-
timer: Timer[F]
47+
implicit F: Temporal[F],
48+
jitter: Jitter[F]
5049
): Throwable => F[Unit]
5150
}
5251

@@ -89,15 +88,14 @@ object CommitRecovery {
8988
offsets: Map[TopicPartition, OffsetAndMetadata],
9089
commit: F[Unit]
9190
)(
92-
implicit F: MonadError[F, Throwable],
93-
jitter: Jitter[F],
94-
timer: Timer[F]
91+
implicit F: Temporal[F],
92+
jitter: Jitter[F]
9593
): Throwable => F[Unit] = {
9694
def retry(attempt: Int): Throwable => F[Unit] = {
9795
case retriable: RetriableCommitFailedException =>
9896
val commitWithRecovery = commit.handleErrorWith(retry(attempt + 1))
99-
if (attempt <= 10) backoff(attempt).flatMap(timer.sleep) >> commitWithRecovery
100-
else if (attempt <= 15) timer.sleep(10.seconds) >> commitWithRecovery
97+
if (attempt <= 10) backoff(attempt).flatMap(F.sleep) >> commitWithRecovery
98+
else if (attempt <= 15) F.sleep(10.seconds) >> commitWithRecovery
10199
else F.raiseError(CommitRecoveryException(attempt - 1, retriable, offsets))
102100

103101
case nonRetriable: Throwable =>
@@ -120,9 +118,8 @@ object CommitRecovery {
120118
offsets: Map[TopicPartition, OffsetAndMetadata],
121119
commit: F[Unit]
122120
)(
123-
implicit F: MonadError[F, Throwable],
124-
jitter: Jitter[F],
125-
timer: Timer[F]
121+
implicit F: Temporal[F],
122+
jitter: Jitter[F]
126123
): Throwable => F[Unit] = F.raiseError
127124

128125
override def toString: String =

modules/core/src/main/scala/fs2/kafka/ConsumerResource.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
package fs2.kafka
88

9-
import cats.effect.{ConcurrentEffect, ContextShift, Resource, Timer}
9+
import cats.effect.{Resource, Async}
1010

1111
/**
1212
* [[ConsumerResource]] provides support for inferring the key and value
@@ -18,19 +18,18 @@ import cats.effect.{ConcurrentEffect, ContextShift, Resource, Timer}
1818
* }}}
1919
*/
2020
final class ConsumerResource[F[_]] private[kafka] (
21-
private val F: ConcurrentEffect[F]
21+
private val F: Async[F]
2222
) extends AnyVal {
2323

2424
/**
2525
* Creates a new [[KafkaConsumer]] in the `Resource` context.
2626
* This is equivalent to using `KafkaConsumer.resource` directly,
2727
* except we're able to infer the key and value type.
2828
*/
29-
def using[K, V](settings: ConsumerSettings[F, K, V])(
30-
implicit context: ContextShift[F],
31-
timer: Timer[F]
29+
def using[K, V](
30+
settings: ConsumerSettings[F, K, V]
3231
): Resource[F, KafkaConsumer[F, K, V]] =
33-
KafkaConsumer.resource(settings)(F, context, timer)
32+
KafkaConsumer.resource[F, K, V](settings)(F)
3433

3534
override def toString: String =
3635
"ConsumerResource$" + System.identityHashCode(this)

modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
package fs2.kafka
88

9-
import cats.effect.{Blocker, Sync}
9+
import cats.effect.{Sync}
1010
import cats.Show
1111
import fs2.kafka.internal.converters.collection._
1212
import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -47,19 +47,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
4747
*/
4848
def valueDeserializer: F[Deserializer[F, V]]
4949

50-
/**
51-
* The `Blocker` to use for blocking Kafka operations. If not
52-
* explicitly provided, a default `Blocker` will be created
53-
* when creating a `KafkaConsumer` instance.
54-
*/
55-
def blocker: Option[Blocker]
56-
57-
/**
58-
* Returns a new [[ConsumerSettings]] instance with the
59-
* specified [[blocker]] to use for blocking operations.
60-
*/
61-
def withBlocker(blocker: Blocker): ConsumerSettings[F, K, V]
62-
6350
/**
6451
* Properties which can be provided when creating a Java `KafkaConsumer`
6552
* instance. Numerous functions in [[ConsumerSettings]] add properties
@@ -403,7 +390,6 @@ object ConsumerSettings {
403390
private[this] final case class ConsumerSettingsImpl[F[_], K, V](
404391
override val keyDeserializer: F[Deserializer[F, K]],
405392
override val valueDeserializer: F[Deserializer[F, V]],
406-
override val blocker: Option[Blocker],
407393
override val properties: Map[String, String],
408394
override val closeTimeout: FiniteDuration,
409395
override val commitTimeout: FiniteDuration,
@@ -414,8 +400,6 @@ object ConsumerSettings {
414400
override val maxPrefetchBatches: Int,
415401
val createConsumerWith: Map[String, String] => F[KafkaByteConsumer]
416402
) extends ConsumerSettings[F, K, V] {
417-
override def withBlocker(blocker: Blocker): ConsumerSettings[F, K, V] =
418-
copy(blocker = Some(blocker))
419403

420404
override def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V] =
421405
withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
@@ -546,7 +530,6 @@ object ConsumerSettings {
546530
ConsumerSettingsImpl(
547531
keyDeserializer = keyDeserializer,
548532
valueDeserializer = valueDeserializer,
549-
blocker = None,
550533
properties = Map(
551534
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "none",
552535
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"

modules/core/src/main/scala/fs2/kafka/ConsumerStream.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
package fs2.kafka
88

9-
import cats.effect.{ConcurrentEffect, ContextShift, Timer}
9+
import cats.effect.Async
1010
import fs2.Stream
1111

1212
/**
@@ -19,19 +19,16 @@ import fs2.Stream
1919
* }}}
2020
*/
2121
final class ConsumerStream[F[_]] private[kafka] (
22-
private val F: ConcurrentEffect[F]
22+
private val F: Async[F]
2323
) extends AnyVal {
2424

2525
/**
2626
* Creates a new [[KafkaConsumer]] in the `Stream` context.
2727
* This is equivalent to using `KafkaConsumer.stream` directly,
2828
* except we're able to infer the key and value type.
2929
*/
30-
def using[K, V](settings: ConsumerSettings[F, K, V])(
31-
implicit context: ContextShift[F],
32-
timer: Timer[F]
33-
): Stream[F, KafkaConsumer[F, K, V]] =
34-
KafkaConsumer.stream(settings)(F, context, timer)
30+
def using[K, V](settings: ConsumerSettings[F, K, V]): Stream[F, KafkaConsumer[F, K, V]] =
31+
KafkaConsumer.stream[F, K, V](settings)(F)
3532

3633
override def toString: String =
3734
"ConsumerStream$" + System.identityHashCode(this)

modules/core/src/main/scala/fs2/kafka/Deserializer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ object Deserializer {
166166

167167
override def suspend: Deserializer[F, A] =
168168
Deserializer.instance { (topic, headers, bytes) =>
169-
F.suspend(deserialize(topic, headers, bytes))
169+
F.defer(deserialize(topic, headers, bytes))
170170
}
171171

172172
override def toString: String =

0 commit comments

Comments
 (0)