Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions spec/queue_dead_lettering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,59 @@ module DeadLetteringSpec
end
end

it "should not stack overflow with two mutually dead-lettering queues" do
with_amqp_server do |s|
with_channel(s) do |ch|
n = 10_000
q1 = ch.queue("dlx_mutual_q1", args: AMQP::Client::Arguments.new({
"x-max-length" => n,
"x-dead-letter-exchange" => "",
"x-dead-letter-routing-key" => "dlx_mutual_q2",
}))
q2 = ch.queue("dlx_mutual_q2", args: AMQP::Client::Arguments.new({
"x-max-length" => n,
"x-dead-letter-exchange" => "",
"x-dead-letter-routing-key" => "dlx_mutual_q1",
}))
n.times { |i| ch.default_exchange.publish("q1-#{i}", q1.name) }
n.times { |i| ch.default_exchange.publish("q2-#{i}", q2.name) }
wait_for { q1.message_count == n && q2.message_count == n }
ch.default_exchange.publish("trigger", q1.name)
wait_for(30.seconds) { q1.message_count == n && q2.message_count == n }

q1_msg = q1.get(no_ack: true).should_not be_nil
q1_msg.body_io.to_s.should eq "q2-0"

q2_msg = q2.get(no_ack: true).should_not be_nil
q2_msg.body_io.to_s.should eq "q1-1"
end
end
end

it "should not stack overflow with a long chain" do
with_amqp_server do |s|
with_channel(s) do |ch|
n = 200
queues = Array(AMQP::Client::Queue).new(n)
n.times do |i|
qargs = if i < n - 1
{
"x-max-length" => 1,
"x-dead-letter-exchange" => "",
"x-dead-letter-routing-key" => "dlx_chain_q#{i + 1}",
}
else
{"x-max-length" => 1}
end
queues << ch.queue("dlx_chain_q#{i}", args: AMQP::Client::Arguments.new(qargs))
end
queues.each { |q| ch.default_exchange.publish("fill", q.name) }
ch.default_exchange.publish("trigger", queues.first.name)
wait_for { queues.last.message_count == 1 }
end
end
end

it "should detect cycle with three queues in chain" do
with_amqp_server do |s|
with_channel(s) do |ch|
Expand Down Expand Up @@ -940,5 +993,67 @@ module DeadLetteringSpec
# end
end
end

describe "Routed Callback" do
# The routed callback deletes the source message from storage.
# With a fanout DLX routing to two queues, the old code stored
# the same callback once per destination and called it N times.
# Correct behavior: source empties to 0 (called at least once)
# and both destinations receive all messages (routing completed).
# A double-call would attempt to delete the same segment twice,
# risking store corruption or stat errors.
it "is called once when dead lettering to multiple queues" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.exchange_declare("dlx_fanout", "fanout")
src = ch.queue("src_fanout", args: AMQP::Client::Arguments.new({
"x-dead-letter-exchange" => "dlx_fanout",
}))
dst1 = ch.queue("dst_fanout_1")
dst2 = ch.queue("dst_fanout_2")
dst1.bind("dlx_fanout", "")
dst2.bind("dlx_fanout", "")

n = 3
n.times { |i| ch.default_exchange.publish_confirm("msg#{i + 1}", src.name) }
get_n(n, src, &.reject(requeue: false))

wait_for { src.message_count == 0 }
wait_for { dst1.message_count == n }
wait_for { dst2.message_count == n }

src.message_count.should eq 0
dst1.message_count.should eq n
dst2.message_count.should eq n
end
end
end

# When cycle detection drops all destinations the routed callback
# must still fire so the source message is removed from storage.
# The queue self-binds to the fanout DLX: on first TTL the message
# is re-queued with an x-death entry; on the second TTL the cycle
# is detected and every destination is skipped. Without the fix
# the callback is never enqueued and the message leaks, leaving
# message_count > 0 after the sleep.
it "is called once when all dead letter destinations are cycles" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.exchange_declare("dlx_fanout_cycle", "fanout")
q = ch.queue("q_fanout_cycle", args: AMQP::Client::Arguments.new({
"x-message-ttl" => 1,
"x-dead-letter-exchange" => "dlx_fanout_cycle",
}))
q.bind("dlx_fanout_cycle", "")

2.times { |i| ch.default_exchange.publish_confirm("msg#{i + 1}", q.name) }

sleep 0.1.seconds

q.message_count.should eq 0
end
end
end
end
end
end
67 changes: 57 additions & 10 deletions src/lavinmq/amqp/argument/dead_lettering.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,25 @@ module LavinMQ::AMQP
add_argument_validator "x-dead-letter-routing-key", ArgumentValidator::DeadLetteringValidator.new
end

alias MessageRoutedCallback = Proc(Nil)
alias Task = {AMQP::Queue, Message} | MessageRoutedCallback

struct Context
@pending = Deque(Task).new

def <<(task : Task)
@pending << task
end

def shift?
@pending.shift?
end
end

class DeadLetterer
property dlx : String? = nil
property dlrk : String? = nil
@context = Context.new

def initialize(@vhost : VHost, @queue_name : String, @log : Logger)
end
Expand Down Expand Up @@ -42,10 +58,12 @@ module LavinMQ::AMQP
# It's done like this to be able to dead letter to all destinations
# except to the queue itself if a cycle is detected.
# This is also how it's done in rabbitmq
def route(msg : BytesMessage, reason)

# ameba:disable Metrics/CyclomaticComplexity
def route(msg : BytesMessage, reason, dlx_context : Context? = nil, &routed : MessageRoutedCallback) : Nil
# No dead letter exchange => nothing to do
return unless dlx = (msg.dlx || dlx())
ex = @vhost.exchanges[dlx.to_s]?.as?(AMQP::Exchange) || return
return routed.call unless dlx = (msg.dlx || dlx())
ex = @vhost.exchanges[dlx.to_s]?.as?(AMQP::Exchange) || return routed.call

dlrk = msg.dlrk || dlrk()

Expand All @@ -69,18 +87,47 @@ module LavinMQ::AMQP
# if the dead letter exchange has any of these features enabled.
queues = Set(AMQP::Queue).new
ex.find_queues(routing_rk, routing_headers, queues)
return if queues.empty?
return routed.call if queues.empty?

first_in_chain = dlx_context.nil?
ctx = dlx_context || @context

dead_lettered_msg = Message.new(
dead_letter_msg = Message.new(
RoughTime.unix_ms, dlx.to_s, routing_rk.to_s,
props, msg.bodysize, IO::Memory.new(msg.body))

queues.each do |q|
next if cycle?(q.name, props, reason)
@log.trace { "dead lettering dest=#{q.name} msg=#{dead_lettered_msg}" }
q.publish(dead_lettered_msg)
rescue ex
@log.warn(exception: ex) { "Unexpected error when dead-lettering to #{q.name}" }
if cycle?(q.name, props, reason)
@log.trace { "dead lettering cycle dest=#{q.name} msg=#{dead_letter_msg}" }
else
@log.trace { "dead lettering dest=#{q.name} msg=#{dead_letter_msg}" }
ctx << {q, dead_letter_msg}
end
end
ctx << routed

drain_context if first_in_chain
end

private def drain_context
while task = @context.shift?
case task
when MessageRoutedCallback
begin
task.call
rescue ex
@log.warn(exception: ex) { "Unexpected error in dead letter routed callback" }
end
else
dst_q, msg = task
begin
dst_q.publish_internal(msg, dlx_context: @context)
rescue Queue::RejectOverFlow
# noop
rescue ex
@log.warn(exception: ex) { "Unexpected error when dead lettering to #{dst_q.name}, messages dropped" }
end
end
end
end

Expand Down
5 changes: 4 additions & 1 deletion src/lavinmq/amqp/queue/delayed_exchange_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ module LavinMQ::AMQP
end

def publish(message : Message) : Bool
# This queue should never be published too
false
end

protected def publish_internal(message : Message, dlx_context : Argument::DeadLettering::Context?) : Bool
false
end

Expand Down
49 changes: 32 additions & 17 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,20 @@ module LavinMQ::AMQP
unless @max_length.try &.< value.as_i64
@max_length = value.as_i64
@effective_args.delete("x-max-length")
drop_overflow
spawn do
@vhost.closed.when_false.receive?
drop_overflow
end
return true
end
when "max-length-bytes"
unless @max_length_bytes.try &.< value.as_i64
@max_length_bytes = value.as_i64
@effective_args.delete("x-max-length-bytes")
drop_overflow
spawn do
@vhost.closed.when_false.receive?
drop_overflow
end
return true
end
when "message-ttl"
Expand Down Expand Up @@ -323,7 +329,10 @@ module LavinMQ::AMQP
unless @delivery_limit.try &.< value.as_i64
@delivery_limit = value.as_i64
@effective_args.delete("x-delivery-limit")
drop_redelivered
spawn do
@vhost.closed.when_false.receive?
drop_redelivered
end
return true
end
when "federation-upstream"
Expand Down Expand Up @@ -512,6 +521,10 @@ module LavinMQ::AMQP
class Closed < Exception; end

def publish(msg : Message) : Bool
publish_internal(msg)
end

protected def publish_internal(msg : Message, dlx_context : Argument::DeadLettering::Context? = nil) : Bool
return false if @deleted || @state.closed?
if d = @deduper
if d.duplicate?(msg)
Expand All @@ -523,9 +536,9 @@ module LavinMQ::AMQP
reject_on_overflow(msg)
@msg_store_lock.synchronize do
@msg_store.push(msg)
drop_overflow(dlx_context)
end
@publish_count.add(1, :relaxed)
drop_overflow_if_no_immediate_delivery
true
rescue ex : MessageStore::Error
@log.error(ex) { "Queue closed due to error" }
Expand All @@ -550,18 +563,20 @@ module LavinMQ::AMQP
end
end

private def drop_overflow_if_no_immediate_delivery : Nil
drop_overflow if (@max_length || @max_length_bytes) && !immediate_delivery?
end
# ameba:disable Metrics/CyclomaticComplexity
private def drop_overflow(dlx_context : Argument::DeadLettering::Context? = nil) : Nil
return unless (ml = @max_length) || (mlb = @max_length_bytes)
# Special case when a limit is set to 0 and a consumer accepts, the messages
# should be delivered instantly
return if ((ml == 0) || (mlb == 0)) && immediate_delivery?

private def drop_overflow : Nil
counter = 0
if ml = @max_length
@msg_store_lock.synchronize do
while @msg_store.size > ml
env = @msg_store.shift? || break
@log.debug { "Overflow drop head sp=#{env.segment_position}" }
expire_msg(env, :maxlen)
expire_msg(env, :maxlen, dlx_context)
counter &+= 1
if counter >= 16 * 1024
Fiber.yield
Expand All @@ -576,7 +591,7 @@ module LavinMQ::AMQP
while @msg_store.bytesize > mlb
env = @msg_store.shift? || break
@log.debug { "Overflow drop head sp=#{env.segment_position}" }
expire_msg(env, :maxlenbytes)
expire_msg(env, :maxlenbytes, dlx_context)
counter &+= 1
if counter >= 16 * 1024
Fiber.yield
Expand Down Expand Up @@ -668,24 +683,24 @@ module LavinMQ::AMQP
@log.info { "Expired #{i} messages" } if i > 0
end

private def expire_msg(sp : SegmentPosition, reason : Symbol)
private def expire_msg(sp : SegmentPosition, reason : Symbol, dlx_context : Argument::DeadLettering::Context? = nil)
if sp.has_dlx? || @dead_letter.dlx
msg = @msg_store_lock.synchronize { @msg_store[sp] }
env = Envelope.new(sp, msg, false)
expire_msg(env, reason)
expire_msg(env, reason, dlx_context)
else
delete_message sp
end
end

private def expire_msg(env : Envelope, reason : Symbol)
private def expire_msg(env : Envelope, reason : Symbol, dlx_context : Argument::DeadLettering::Context? = nil)
sp = env.segment_position
msg = env.message
@log.debug { "Expiring #{sp} now due to #{reason}" }

@dead_letter.route(msg, reason)

delete_message sp
@dead_letter.route(msg, reason, dlx_context) do
delete_message sp
end
end

private def expire_queue : Bool
Expand Down Expand Up @@ -820,7 +835,7 @@ module LavinMQ::AMQP
@msg_store_lock.synchronize do
@msg_store.requeue(sp)
end
drop_overflow_if_no_immediate_delivery
drop_overflow
end
else
expire_msg(sp, :rejected)
Expand Down
6 changes: 5 additions & 1 deletion src/lavinmq/amqp/stream/stream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ module LavinMQ::AMQP
@msg_store.as(StreamMessageStore)
end

# save message id / segment position
def publish(msg : Message) : Bool
publish_internal(msg, nil)
end

# save message id / segment position
protected def publish_internal(msg : Message, dlx_context : Argument::DeadLettering::Context?) : Bool
return false if @state.closed?
@msg_store_lock.synchronize do
@msg_store.push(msg)
Expand Down
Loading