-
Notifications
You must be signed in to change notification settings - Fork 69
Upgrading
I completely blew my commitment to keep API changes to a minimum from RC1 onward. As I was working on tutorial, there were several warts that just were nagging me, and I decided it would be best to make radical API changes while the RC label is still in tact.
This represents one of the largest API changes, of all. I'm seriously excited about it.
-
TopicMessage,QueueMessage,ExchangeMessageobjects have becomeMessage.topic,Message.queue,Message.exchange, etc. -
ConfirmedMessagehas become simplyMessage, as it is the default. -
UnconfirmedMessagegained a symmetrical Factory function API (UnconfirmedMessage.topic,UnconfirmedMessage.factory, etc.) -
QueuePublisher,ExchangePublisher, andTopicPublisherhave becomePublisher.topic,Publisher.exchange,Publisher.topic, etc.; There is only onePublisherinstance. The factory methods are just for convenience and offer defaults that make sense for each of the scenarios.
-
QueueBindinghas becomeQueue,ExchangeBindinghas becomeExchange.-
TopicBinding,HeadersBinding,FanoutBindingreceive aQueuedefinition and anExchangedefinition, rather than having the parameters for the creation of each flatten. -
PassiveQueueBindinghas becomeQueue.passive.Queue.passivecan receive aQueuedefinition which will be used to declare the queue in the event that the queue in question doesn't already exist. -
Exchange.passivehas been created, similarly, can receive a non-passiveExchangedefinition. -
Exchangedefinitions are generically typed; if you pass an Exchange[Exchange.Topic.Value] to a HeadersBinding, the compiler will yell at you.
-
- Modeled Queue / Exchange arguments introduced, providing compiler-level safety for both argument names, types; Where a duration is concerned, the Modeled argument receives a FiniteDuration, which it maps to an integer of milliseconds.
Here's a complex example using the new Queue definition syntax, showing how easy it is to passively declare a queue, and create it if it doesn't exist.
import Queue.ModeledArgs._
Queue.passive(
Queue(
s"op-rabbit.retry.${queueName}",
durable = true,
arguments = Seq(
`x-expires`(30 seconds),
`x-message-ttl`(5 minutes),
`x-dead-letter-exchange`(""), // default exchange
`x-dead-letter-routing-key`(queueName))))-
RecoveryStrategy.limitedRedeliveruses a RabbitMQ queue with dead-letter forwarding options and aTTLto handle message retry. The messages still go to the back of the line, but the consumer is no longer slowed down in failure mode. - The
Directivesqueue binding DSL has been updated to incorporate the above changes. -
Subscription.registerbecameSubscription.run; The same change applies for theregistermethod on a Subscription definition instance.
-
com.spingo.op_rabbit.consumer._was moved intocom.spingo.op_rabbit._; update your imports, accordingly. - The method of instantiating
Subscriptionhas changed, substantially.Subscriptionis now completely stateless. - Casting Header values in the Handler DSL has changed, and supports
optionalProperty.
Before, subscriptions were declared and registered like this:
val subscription = new Subscription {
// A qos of 3 will cause up to 3 concurrent messages to be processed at any given time.
def config = channel(qos = 3) {
consume(topic("such-message-queue", List("some-topic.#"))) {
body(as[Person]) { person =>
// do work; this body is executed in a separate thread, as provided by the implicit execution context
ack()
}
}
}
}
rabbitControl ! subscription
subscription.initialized.foreach { _ =>
println("Initialized!")
subscription.close()
}Subscription had a close() method and a closed property. This is no more.
The above is rewritten as follows for v1.0.0-RC1:
val subscription = Subscription {
// A qos of 3 will cause up to 3 concurrent messages to be processed at any given time.
def config = channel(qos = 3) {
consume(topic("such-message-queue", List("some-topic.#"))) {
body(as[Person]) { person =>
// do work; this body is executed in a separate thread, as provided by the implicit execution context
ack()
}
}
}
}
val subscriptionRef = subscription.register(rabbitControl)
subscriptionRef.initialized.foreach { _ =>
println("Initialized!")
subscriptionRef.close()
}Before, HeaderValues were cast using as follows:
property(Header("x-retries")).as(typeOf[Int])This did not work for optionalProperty; the old way no longer works and the new way is shorter:
property(Header("x-retries").as[Int])
optionalProperty(Header("x-retries").as[Int])