Skip to content

Commit 24f74bd

Browse files
Bugfix: Don't reset x-cache-ttl when expiring msg with delayed exchange
Previously, when using deduplication and delayed exchange together, the x-cache-ttl would be reset when a message was expired and re-published (if cache-ttl and delay was the same), meaning the cache_ttl was effectively more like cache_ttl + msg_delay. This changes the behavior so that a delayed message will not trigger deduplication by calling exchange.route_msg instead of vhost.publish. Also refactors exchange.publish to separate the msg routing to a separate route_msg function (was do_publish) that just does the msg routing.
1 parent 64d091d commit 24f74bd

File tree

5 files changed

+69
-24
lines changed

5 files changed

+69
-24
lines changed

spec/deduplication_spec.cr

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,5 +160,44 @@ describe LavinMQ::Deduplication::Deduper do
160160
calls.first[0].should eq "msg1"
161161
calls.first[1].should eq 10
162162
end
163+
164+
it "should not reset x-cache-ttl on expire when using delayed exchange" do
165+
cache_ttl = 1000
166+
x_args = AMQP::Client::Arguments.new({
167+
"x-delayed-exchange" => true,
168+
"x-delayed-type" => "topic",
169+
"x-message-deduplication" => true,
170+
"x-cache-ttl" => cache_ttl,
171+
"x-cache-size" => 1000,
172+
})
173+
q_name = "delayed_q"
174+
with_amqp_server do |s|
175+
with_channel(s) do |ch|
176+
x = ch.exchange("delayed_ex", "topic", args: x_args)
177+
q = ch.queue(q_name)
178+
q.bind(x.name, "#")
179+
hdrs = AMQP::Client::Arguments.new({
180+
"x-delay" => cache_ttl,
181+
"x-deduplication-header" => "msg1",
182+
})
183+
x.publish "test message", "rk", props: AMQP::Client::Properties.new(headers: hdrs)
184+
queue = s.vhosts["/"].queues[q_name]
185+
queue.message_count.should eq 0 # no message yet, delayed exchange
186+
187+
# second publish should be deduplicated
188+
sleep 100.milliseconds # wait for first publish to be processed
189+
x.publish "test message", "rk", props: AMQP::Client::Properties.new(headers: hdrs)
190+
wait_for { queue.message_count == 1 }
191+
192+
# get message, message_count should be 0
193+
ch.basic_get(q_name, no_ack: true)
194+
wait_for { queue.message_count == 0 }
195+
196+
# cache_ttl has passed, message should be delivered
197+
x.publish "test message", "rk", props: AMQP::Client::Properties.new(headers: hdrs)
198+
wait_for { queue.message_count == 1 }
199+
end
200+
end
201+
end
163202
end
164203
end

spec/exchange_spec.cr

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,13 @@ describe LavinMQ::Exchange do
228228
"x-deduplication-header" => "msg1",
229229
}))
230230
msg = LavinMQ::Message.new("ex", "rk", "body", props)
231-
ex.publish(msg, false).should eq 1
231+
ex.publish(msg, false).should eq true
232232
ex.dedup_count.should eq 0
233233
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
234234
"x-deduplication-header" => "msg1",
235235
}))
236236
msg = LavinMQ::Message.new("ex", "rk", "body", props)
237-
ex.publish(msg, false).should eq 0
237+
ex.publish(msg, false).should eq false
238238
ex.dedup_count.should eq 1
239239

240240
q.message_count.should eq 1
@@ -257,13 +257,13 @@ describe LavinMQ::Exchange do
257257
"custom" => "msg1",
258258
}))
259259
msg = LavinMQ::Message.new("ex", "rk", "body", props)
260-
ex.publish(msg, false).should eq 1
260+
ex.publish(msg, false).should eq true
261261
ex.dedup_count.should eq 0
262262
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
263263
"custom" => "msg1",
264264
}))
265265
msg = LavinMQ::Message.new("ex", "rk", "body", props)
266-
ex.publish(msg, false).should eq 0
266+
ex.publish(msg, false).should eq false
267267
ex.dedup_count.should eq 1
268268

269269
q.message_count.should eq 1

src/lavinmq/amqp/exchange/exchange.cr

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -196,43 +196,50 @@ module LavinMQ
196196

197197
def publish(msg : Message, immediate : Bool,
198198
queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new,
199-
exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : UInt32
199+
exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Bool
200200
@publish_in_count.add(1, :relaxed)
201201
if d = @deduper
202202
if d.duplicate?(msg)
203203
@dedup_count.add(1, :relaxed)
204-
return 0u32
204+
return false
205205
end
206206
d.add(msg)
207207
end
208-
count = do_publish(msg, immediate, queues, exchanges)
209-
@unroutable_count.add(1, :relaxed) if count.zero?
210-
@publish_out_count.add(count, :relaxed)
211-
count
212-
end
213-
214-
private def do_publish(msg : Message, immediate : Bool, queues : Set(LavinMQ::Queue), exchanges : Set(LavinMQ::Exchange)) : UInt32
215-
headers = msg.properties.headers
216-
if should_delay_message?(headers)
208+
if should_delay_message?(msg.properties.headers)
217209
if q = @delayed_queue
218210
q.publish(msg)
219-
return 1u32
211+
@publish_out_count.add(1, :relaxed)
212+
return true
220213
else
221-
return 0u32
214+
@unroutable_count.add(1, :relaxed)
215+
return false
222216
end
223217
end
218+
route_msg(msg, immediate, queues, exchanges)
219+
end
220+
221+
def route_msg(msg : Message) : Bool
222+
route_msg(msg, false, Set(LavinMQ::Queue).new, Set(LavinMQ::Exchange).new)
223+
end
224+
225+
private def route_msg(msg : Message, immediate : Bool, queues : Set(LavinMQ::Queue), exchanges : Set(LavinMQ::Exchange)) : Bool
226+
headers = msg.properties.headers
224227
find_queues(msg.routing_key, headers, queues, exchanges)
225-
return 0u32 if queues.empty?
226-
return 0u32 if immediate && !queues.any? &.immediate_delivery?
228+
if queues.empty? || (immediate && !queues.any? &.immediate_delivery?)
229+
@unroutable_count.add(1, :relaxed)
230+
return false
231+
end
227232

228233
count = 0u32
229234
queues.each do |queue|
230235
if queue.publish(msg)
231-
count += 1u32
236+
count += 1
232237
msg.body_io.seek(-msg.bodysize.to_i64, IO::Seek::Current) # rewind
233238
end
234239
end
235-
count
240+
@publish_out_count.add(count, :relaxed)
241+
@unroutable_count.add(1, :relaxed) if count.zero?
242+
count.positive?
236243
end
237244

238245
def find_queues(routing_key : String, headers : AMQP::Table?,

src/lavinmq/amqp/queue/delayed_exchange_queue.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ module LavinMQ::AMQP
4949
headers.delete("x-delay")
5050
msg.properties.headers = headers
5151
end
52-
@vhost.publish Message.new(msg.timestamp, @exchange_name, msg.routing_key,
52+
@vhost.exchanges[@exchange_name].route_msg Message.new(msg.timestamp, @exchange_name, msg.routing_key,
5353
msg.properties, msg.bodysize, IO::Memory.new(msg.body))
5454
delete_message sp
5555
end

src/lavinmq/vhost.cr

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,7 @@ module LavinMQ
124124
def publish(msg : Message, immediate = false,
125125
visited = Set(LavinMQ::Exchange).new, found_queues = Set(LavinMQ::Queue).new) : Bool
126126
ex = @exchanges[msg.exchange_name]? || return false
127-
published_queue_count = ex.publish(msg, immediate, found_queues, visited)
128-
!published_queue_count.zero?
127+
ex.publish(msg, immediate, found_queues, visited)
129128
ensure
130129
visited.clear
131130
found_queues.clear

0 commit comments

Comments
 (0)