Skip to content

Commit 0d5f70d

Browse files
authored
Merge pull request #946 from geirolz/add-queue-type
Add QueueType model
2 parents 8148b8e + 723f7c2 commit 0d5f70d

File tree

3 files changed

+72
-24
lines changed

3 files changed

+72
-24
lines changed

core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Declaration.scala

+23-19
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package dev.profunktor.fs2rabbit.algebra
1818

1919
import cats.effect.Sync
20-
import cats.syntax.functor._
20+
import cats.syntax.all._
2121
import dev.profunktor.fs2rabbit.arguments._
2222
import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig}
2323
import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._
@@ -63,26 +63,30 @@ object Declaration {
6363
}
6464

6565
override def declareQueue(channel: AMQPChannel, config: DeclarationQueueConfig): F[Unit] =
66-
Sync[F].blocking {
67-
channel.value.queueDeclare(
68-
config.queueName.value,
69-
config.durable.isTrue,
70-
config.exclusive.isTrue,
71-
config.autoDelete.isTrue,
72-
config.arguments
73-
)
74-
}.void
66+
Sync[F].fromEither(config.validatedArguments).flatMap { args =>
67+
Sync[F].blocking {
68+
channel.value.queueDeclare(
69+
config.queueName.value,
70+
config.durable.isTrue,
71+
config.exclusive.isTrue,
72+
config.autoDelete.isTrue,
73+
args
74+
)
75+
}.void
76+
}
7577

7678
override def declareQueueNoWait(channel: AMQPChannel, config: DeclarationQueueConfig): F[Unit] =
77-
Sync[F].blocking {
78-
channel.value.queueDeclareNoWait(
79-
config.queueName.value,
80-
config.durable.isTrue,
81-
config.exclusive.isTrue,
82-
config.autoDelete.isTrue,
83-
config.arguments
84-
)
85-
}.void
79+
Sync[F].fromEither(config.validatedArguments).flatMap { args =>
80+
Sync[F].blocking {
81+
channel.value.queueDeclareNoWait(
82+
config.queueName.value,
83+
config.durable.isTrue,
84+
config.exclusive.isTrue,
85+
config.autoDelete.isTrue,
86+
args
87+
)
88+
}.void
89+
}
8690

8791
override def declareQueuePassive(channel: AMQPChannel, queueName: QueueName): F[Unit] =
8892
Sync[F].blocking {

core/src/main/scala/dev/profunktor/fs2rabbit/config/declaration.scala

+36-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package dev.profunktor.fs2rabbit.config
1818

1919
import dev.profunktor.fs2rabbit.arguments.Arguments
20-
import dev.profunktor.fs2rabbit.model.{ExchangeName, ExchangeType, QueueName}
20+
import dev.profunktor.fs2rabbit.model.{ExchangeName, ExchangeType, QueueName, QueueType}
2121

2222
object declaration {
2323

@@ -26,12 +26,44 @@ object declaration {
2626
durable: DurableCfg,
2727
exclusive: ExclusiveCfg,
2828
autoDelete: AutoDeleteCfg,
29-
arguments: Arguments
30-
)
29+
arguments: Arguments,
30+
queueType: Option[QueueType]
31+
) {
32+
33+
lazy val validatedArguments: Either[IllegalArgumentException, Arguments] =
34+
queueType match {
35+
case Some(_) if arguments.contains("x-queue-type") =>
36+
Left(
37+
new IllegalArgumentException(
38+
"Queue type defined twice. It is set in the arguments and in the DeclarationQueueConfig."
39+
)
40+
)
41+
case Some(queueType) =>
42+
Right(arguments + ("x-queue-type" -> queueType.asString))
43+
case None =>
44+
Right(arguments)
45+
}
46+
}
3147
object DeclarationQueueConfig {
3248

3349
def default(queueName: QueueName): DeclarationQueueConfig =
34-
DeclarationQueueConfig(queueName, NonDurable, NonExclusive, NonAutoDelete, Map.empty)
50+
DeclarationQueueConfig(
51+
queueName = queueName,
52+
durable = NonDurable,
53+
exclusive = NonExclusive,
54+
autoDelete = NonAutoDelete,
55+
arguments = Map.empty,
56+
queueType = None
57+
)
58+
59+
def classic(queueName: QueueName): DeclarationQueueConfig =
60+
default(queueName).copy(queueType = Some(QueueType.Classic))
61+
62+
def quorum(queueName: QueueName): DeclarationQueueConfig =
63+
default(queueName).copy(queueType = Some(QueueType.Quorum))
64+
65+
def stream(queueName: QueueName): DeclarationQueueConfig =
66+
default(queueName).copy(queueType = Some(QueueType.Stream))
3567
}
3668

3769
sealed trait DurableCfg extends Product with Serializable

core/src/main/scala/dev/profunktor/fs2rabbit/model.scala

+13-1
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,20 @@ object model {
9393
extends ExchangeType // for use with the plugin https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/
9494
}
9595

96-
sealed abstract class DeliveryMode(val value: Int) extends Product with Serializable
96+
sealed trait QueueType extends Product with Serializable {
97+
def asString: String = this match {
98+
case QueueType.Classic => "classic"
99+
case QueueType.Quorum => "quorum"
100+
case QueueType.Stream => "stream"
101+
}
102+
}
103+
object QueueType {
104+
case object Classic extends QueueType
105+
case object Quorum extends QueueType
106+
case object Stream extends QueueType
107+
}
97108

109+
sealed abstract class DeliveryMode(val value: Int) extends Product with Serializable
98110
object DeliveryMode {
99111
case object NonPersistent extends DeliveryMode(1)
100112
case object Persistent extends DeliveryMode(2)

0 commit comments

Comments
 (0)