diff --git a/build.sbt b/build.sbt index c0173183..cb8d6d79 100644 --- a/build.sbt +++ b/build.sbt @@ -64,7 +64,7 @@ val commonSettings = List( libraryDependencies ++= commonDependencies(scalaVersion.value), resolvers += "Apache public" at "https://repository.apache.org/content/groups/public/", scalafmtOnCompile := true, - mimaPreviousArtifacts := Set(organization.value %% moduleName.value % "4.1.0") + mimaPreviousArtifacts := Set(organization.value %% moduleName.value % "4.1.1") ) def CoreDependencies(scalaVersionStr: String): List[ModuleID] = diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala index 4581eb65..907b1508 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala @@ -94,16 +94,14 @@ object RabbitClient { threadFactory: Option[F[ThreadFactory]], executionContext: Option[F[ExecutionContext]] ) { - def withSslContext(sslContext: SSLContext): Builder[F] = new Builder[F]( - config = config, - sslContext = Some(sslContext), - saslConfig = saslConfig, - metricsCollector = metricsCollector, - threadFactory = threadFactory, - executionContext = executionContext - ) {} - - def withSaslConfig(saslConfig: SaslConfig): Builder[F] = new Builder[F]( + private def copy( + config: Fs2RabbitConfig = config, + sslContext: Option[SSLContext] = sslContext, + saslConfig: SaslConfig = saslConfig, + metricsCollector: Option[MetricsCollector] = metricsCollector, + threadFactory: Option[F[ThreadFactory]] = threadFactory, + executionContext: Option[F[ExecutionContext]] = executionContext + ): Builder[F] = new Builder[F]( config = config, sslContext = sslContext, saslConfig = saslConfig, @@ -112,38 +110,22 @@ object RabbitClient { executionContext = executionContext ) {} - def withMetricsCollector(metricsCollector: MetricsCollector): Builder[F] = new Builder[F]( - config = config, - sslContext = sslContext, - saslConfig = saslConfig, - metricsCollector = Some(metricsCollector), - threadFactory = threadFactory, - executionContext = executionContext - ) {} + def withSslContext(sslContext: SSLContext): Builder[F] = copy(sslContext = Some(sslContext)) - def withThreadFactory(threadFactory: F[ThreadFactory]): Builder[F] = new Builder[F]( - config = config, - sslContext = sslContext, - saslConfig = saslConfig, - metricsCollector = metricsCollector, - threadFactory = Some(threadFactory), - executionContext = executionContext - ) {} + def withSaslConfig(saslConfig: SaslConfig): Builder[F] = copy(saslConfig = saslConfig) + + def withMetricsCollector(metricsCollector: MetricsCollector): Builder[F] = + copy(metricsCollector = Some(metricsCollector)) + + def withThreadFactory(threadFactory: F[ThreadFactory]): Builder[F] = copy(threadFactory = Some(threadFactory)) def withExecutionContext(executionContext: F[ExecutionContext]): Builder[F] = - new Builder[F]( - config = config, - sslContext = sslContext, - saslConfig = saslConfig, - metricsCollector = metricsCollector, - threadFactory = threadFactory, - executionContext = Some(executionContext) - ) {} + copy(executionContext = Some(executionContext)) def build(dispatcher: Dispatcher[F]): F[RabbitClient[F]] = create[F](config, dispatcher, sslContext, saslConfig, metricsCollector, threadFactory, executionContext) - def resource(): Resource[F, RabbitClient[F]] = + def resource: Resource[F, RabbitClient[F]] = Dispatcher[F].evalMap(build) } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala index 9ca2b22d..cf7f39f6 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala @@ -68,7 +68,7 @@ object DropwizardMetricsDemo extends IOApp.Simple { val resources = for { _ <- JmxReporterResource.make[IO](registry) - client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource() + client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource channel <- client.createConnection.flatMap(client.createChannel) } yield (channel, client) diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala index f2595a85..68379693 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala @@ -42,7 +42,7 @@ object IOAckerConsumer extends IOApp.Simple { ) override def run: IO[Unit] = - RabbitClient.default[IO](config).resource().use { client => + RabbitClient.default[IO](config).resource.use { client => ResilientStream .runF(new AckerConsumerDemo[IO](client).program) } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala index c100d9c9..4036209a 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala @@ -54,7 +54,7 @@ object RPCDemo extends IOApp.Simple { ) def run: IO[Unit] = - RabbitClient.default[IO](config).resource().use { implicit client => + RabbitClient.default[IO](config).resource.use { implicit client => val queue = QueueName("rpc_queue") runServer[IO](queue).concurrently(runClient[IO](queue)).compile.drain } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala index ba967dfb..36adee52 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala @@ -43,7 +43,7 @@ object ZIOAutoAckConsumer extends CatsApp { override def run(args: List[String]): URIO[ZEnv, ExitCode] = RabbitClient .default[Task](config) - .resource() + .resource .use { client => ResilientStream .runF(new AutoAckConsumerDemo[Task](client).program) diff --git a/site/docs/config.md b/site/docs/config.md index e74f61f8..89453779 100644 --- a/site/docs/config.md +++ b/site/docs/config.md @@ -30,7 +30,8 @@ val config = Fs2RabbitConfig( requeueOnReject = false, internalQueueSize = Some(500), requestedHeartbeat = 30.seconds, - automaticRecovery = true + automaticRecovery = true, + clientProvidedConnectionName = Some("app:rabbit") ) ``` diff --git a/site/docs/examples/client-metrics.md b/site/docs/examples/client-metrics.md index 42103109..4333b2cf 100644 --- a/site/docs/examples/client-metrics.md +++ b/site/docs/examples/client-metrics.md @@ -17,7 +17,7 @@ val dropwizardCollector = new StandardMetricsCollector(registry) Now it is ready to use. ```scala -RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource() +RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource ``` ## Expose via JMX @@ -54,7 +54,7 @@ Let's initialise the FS2 RabbitMQ client and AMQP channel with metrics. ```scala val resources = for { _ <- JmxReporterResource.make[IO](registry) - client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource() + client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource channel <- client.createConnection.flatMap(client.createChannel) } yield (channel, client) @@ -109,7 +109,8 @@ object DropwizardMetricsDemo extends IOApp { requeueOnReject = false, internalQueueSize = Some(500), requestedHeartbeat = 60.seconds, - automaticRecovery = true + automaticRecovery = true, + clientProvidedConnectionName = Some("app:rabbit") ) private val queueName = QueueName("testQ") @@ -129,7 +130,7 @@ object DropwizardMetricsDemo extends IOApp { val resources = for { _ <- JmxReporterResource.make[IO](registry) - client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource() + client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource channel <- client.createConnection.flatMap(client.createChannel) } yield (channel, client) diff --git a/site/docs/examples/sample-acker.md b/site/docs/examples/sample-acker.md index 36500110..c328b2c6 100644 --- a/site/docs/examples/sample-acker.md +++ b/site/docs/examples/sample-acker.md @@ -117,11 +117,12 @@ object IOAckerConsumer extends IOApp { requeueOnReject = false, internalQueueSize = Some(500), requestedHeartbeat = 60.seconds, - automaticRecovery = true + automaticRecovery = true, + clientProvidedConnectionName = Some("app:rabbit") ) override def run(args: List[String]): IO[ExitCode] = - RabbitClient.default[IO](config).resource().use { client => + RabbitClient.default[IO](config).resource.use { client => ResilientStream .runF(new AckerConsumerDemo[IO](client).program) .as(ExitCode.Success) diff --git a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala index a8aa5f3f..a8ef7d00 100644 --- a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala +++ b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala @@ -757,7 +757,7 @@ trait Fs2RabbitSpec { self: BaseSpec => private def withStreamRabbit[A](fa: RabbitClient[IO] => Stream[IO, A]): Future[Assertion] = RabbitClient .default[IO](config) - .resource() + .resource .use(r => fa(r).compile.drain) .as(emptyAssertion) .unsafeToFuture() @@ -765,7 +765,7 @@ trait Fs2RabbitSpec { self: BaseSpec => private def withStreamNackRabbit[A](fa: RabbitClient[IO] => Stream[IO, A]): Future[Assertion] = RabbitClient .default[IO](config.copy(requeueOnNack = true)) - .resource() + .resource .use(r => fa(r).compile.drain) .as(emptyAssertion) .unsafeToFuture() @@ -773,7 +773,7 @@ trait Fs2RabbitSpec { self: BaseSpec => private def withStreamRejectRabbit[A](fa: RabbitClient[IO] => Stream[IO, A]): Future[Assertion] = RabbitClient .default[IO](config.copy(requeueOnReject = true)) - .resource() + .resource .use(r => fa(r).compile.drain) .as(emptyAssertion) .unsafeToFuture() @@ -781,7 +781,7 @@ trait Fs2RabbitSpec { self: BaseSpec => private def withRabbit[A](fa: RabbitClient[IO] => IO[A]): Future[A] = RabbitClient .default[IO](config) - .resource() + .resource .use(r => fa(r)) .unsafeToFuture()