Skip to content

Commit 2ae3814

Browse files
retrikujbwheatley
authored andcommitted
Use thread factory
1 parent eaca068 commit 2ae3814

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@ import dev.profunktor.fs2rabbit.model.RabbitConnection
3636
import java.util.Collections
3737
import java.util.concurrent.AbstractExecutorService
3838
import java.util.concurrent.ExecutorService
39-
import java.util.concurrent.Executors
4039
import java.util.concurrent.ThreadFactory
4140
import java.util.concurrent.TimeUnit
4241
import javax.net.ssl.SSLContext
4342
import scala.concurrent.ExecutionContext
4443
import scala.jdk.CollectionConverters._
44+
import java.util.concurrent.Executors
4545

4646
object ConnectionResource {
4747
type ConnectionResource[F[_]] = Connection[Resource[F, *]]
@@ -63,20 +63,28 @@ object ConnectionResource {
6363
}
6464
}
6565

66-
val numOfThreads = Runtime.getRuntime().availableProcessors() * 2
67-
val es = Executors.newFixedThreadPool(numOfThreads)
68-
sys.addShutdownHook(es.shutdown())
66+
val numOfThreads = Runtime.getRuntime().availableProcessors() * 2
67+
val esF: F[ExecutorService] = threadFactory
68+
.fold(Executors.newFixedThreadPool(numOfThreads).pure[F]) {
69+
_.map(Executors.newFixedThreadPool(numOfThreads, _))
70+
}
71+
.map { es =>
72+
sys.addShutdownHook(es.shutdown())
73+
es
74+
}
6975

70-
addThreadFactory.flatMap { fn =>
71-
_make(
72-
conf,
73-
Some(ExecutionContext.fromExecutorService(es)),
74-
sslCtx,
75-
saslConf,
76-
metricsCollector,
77-
fn
78-
)
79-
}
76+
for {
77+
es <- esF
78+
fn <- addThreadFactory
79+
conn <- _make(
80+
conf,
81+
Some(ExecutionContext.fromExecutorService(es)),
82+
sslCtx,
83+
saslConf,
84+
metricsCollector,
85+
fn
86+
)
87+
} yield conn
8088
}
8189

8290
def make[F[_]: Sync: Log](

0 commit comments

Comments
 (0)