diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala index 9e417715..51267d6d 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala @@ -36,6 +36,7 @@ import dev.profunktor.fs2rabbit.model.RabbitConnection import java.util.Collections import java.util.concurrent.AbstractExecutorService import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.concurrent.ThreadFactory import java.util.concurrent.TimeUnit import javax.net.ssl.SSLContext @@ -45,6 +46,39 @@ import scala.jdk.CollectionConverters._ object ConnectionResource { type ConnectionResource[F[_]] = Connection[Resource[F, *]] + @deprecated(message = "Use `make` with explicit ExecutionContext", since = "5.0.0") + def make[F[_]: Sync: Log]( + conf: Fs2RabbitConfig, + sslCtx: Option[SSLContext] = None, + // Unlike SSLContext, SaslConfig is not optional because it is always set + // by the underlying Java library, even if the user doesn't set it. + saslConf: SaslConfig = DefaultSaslConfig.PLAIN, + metricsCollector: Option[MetricsCollector] = None, + threadFactory: Option[F[ThreadFactory]] = None + ): F[Connection[Resource[F, *]]] = { + val addThreadFactory: F[ConnectionFactory => Unit] = + threadFactory.fold(Sync[F].pure((_: ConnectionFactory) => ())) { threadFact => + threadFact.map { tf => (cf: ConnectionFactory) => + cf.setThreadFactory(tf) + } + } + + val numOfThreads = Runtime.getRuntime().availableProcessors() * 2 + val es = Executors.newFixedThreadPool(numOfThreads) + sys.addShutdownHook(es.shutdown()) + + addThreadFactory.flatMap { fn => + _make( + conf, + Some(ExecutionContext.fromExecutorService(es)), + sslCtx, + saslConf, + metricsCollector, + fn + ) + } + } + def make[F[_]: Sync: Log]( conf: Fs2RabbitConfig, executionContext: ExecutionContext, diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala index 3cc96bfe..4581eb65 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala @@ -40,6 +40,50 @@ import javax.net.ssl.SSLContext import scala.concurrent.ExecutionContext object RabbitClient { + @deprecated(message = "Use `default` to create Builder instead", since = "5.0.0") + def apply[F[_]: Async]( + config: Fs2RabbitConfig, + dispatcher: Dispatcher[F], + sslContext: Option[SSLContext] = None, + // Unlike SSLContext, SaslConfig is not optional because it is always set + // by the underlying Java library, even if the user doesn't set it. + saslConfig: SaslConfig = DefaultSaslConfig.PLAIN, + metricsCollector: Option[MetricsCollector] = None, + threadFactory: Option[F[ThreadFactory]] = None + ): F[RabbitClient[F]] = { + val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500)) + val connection = ConnectionResource.make(config, sslContext, saslConfig, metricsCollector, threadFactory) + val consumingProgram = AckConsumingProgram.make[F](config, internalQ, dispatcher) + val publishingProgram = PublishingProgram.make[F](dispatcher) + val bindingClient = Binding.make[F] + val declarationClient = Declaration.make[F] + val deletionClient = Deletion.make[F] + + connection.map { conn => + new RabbitClient[F]( + conn, + bindingClient, + declarationClient, + deletionClient, + consumingProgram, + publishingProgram + ) + } + } + + @deprecated(message = "Use `default` to create Builder instead", since = "5.0.0") + def resource[F[_]: Async]( + config: Fs2RabbitConfig, + sslContext: Option[SSLContext] = None, + // Unlike SSLContext, SaslConfig is not optional because it is always set + // by the underlying Java library, even if the user doesn't set it. + saslConfig: SaslConfig = DefaultSaslConfig.PLAIN, + metricsCollector: Option[MetricsCollector] = None, + threadFactory: Option[F[ThreadFactory]] = None + ): Resource[F, RabbitClient[F]] = Dispatcher[F].evalMap { dispatcher => + apply[F](config, dispatcher, sslContext, saslConfig, metricsCollector, threadFactory) + } + sealed abstract class Builder[F[_]: Async] private[RabbitClient] ( config: Fs2RabbitConfig, sslContext: Option[SSLContext],