Skip to content

Commit 850d822

Browse files
committed
Add sync mechanism between stop function and consumer thread to avoid race conditions
1 parent db21153 commit 850d822

File tree

1 file changed

+20
-5
lines changed

1 file changed

+20
-5
lines changed

lib/logstash/inputs/rabbitmq.rb

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ module Inputs
5959
class RabbitMQ < LogStash::Inputs::Threadable
6060

6161
java_import java.util.concurrent.TimeUnit
62+
java_import java.util.concurrent.CountDownLatch
6263

6364
include ::LogStash::PluginMixins::RabbitMQConnection
6465

@@ -189,6 +190,7 @@ def setup!
189190
connect!
190191
declare_queue!
191192
bind_exchange!
193+
@terminated = CountDownLatch.new(1)
192194
@hare_info.channel.prefetch = @prefetch_count
193195
rescue => e
194196
# when encountering an exception during shut-down,
@@ -260,15 +262,18 @@ def internal_queue_consume!
260262
while true
261263
payload = @internal_queue.poll(10, TimeUnit::MILLISECONDS)
262264
if !payload # Nothing in the queue
263-
if last_delivery_tag # And we have unacked stuff
265+
if last_delivery_tag # And we have unacked stuff
264266
@hare_info.channel.ack(last_delivery_tag, true) if @ack
265267
i=0
266268
last_delivery_tag = nil
267269
end
268270
next
269271
end
270-
271-
break if payload == INTERNAL_QUEUE_POISON
272+
if payload == INTERNAL_QUEUE_POISON
273+
@logger.info("RabbitMQ consumer thread received shutdown signal, exiting")
274+
@terminated.countDown
275+
break
276+
end
272277

273278
metadata, data = payload
274279
@codec.decode(data) do |event|
@@ -302,11 +307,22 @@ def decorate(event, metadata, data)
302307
end
303308

304309
def stop
305-
@internal_queue&.put(INTERNAL_QUEUE_POISON)
310+
stop_consumer_thread
306311
shutdown_consumer
307312
close_connection
308313
end
309314

315+
def stop_consumer_thread
316+
# After sending the poison pill, we need to wait for the consumer thread to actually exit.
317+
# Closing channel right away may lead to messages not being acked properly or a race condition
318+
# where the consumer thread tries to ack a message on a closed channel.
319+
@internal_queue.put(INTERNAL_QUEUE_POISON)
320+
timed_out = !@terminated.await(2, TimeUnit::SECONDS)
321+
if timed_out
322+
@logger.warn("Timeout waiting for RabbitMQ consumer thread to terminate")
323+
end
324+
end
325+
310326
def shutdown_consumer
311327
# There are two possible flows to shutdown consumers. When the plugin is the one shutting down, it should send a channel
312328
# cancellation message by invoking channel.basic_cancel(consumer_tag) and waiting for the consumer to terminate
@@ -315,7 +331,6 @@ def shutdown_consumer
315331
# internally by the client, unregistering the consumer and then invoking the :on_cancellation callback. In that case, the plugin
316332
# should not do anything as the consumer is already cancelled/unregistered.
317333
return if !@consumer || @consumer.cancelled? || @consumer.terminated?
318-
319334
@hare_info.channel.basic_cancel(@consumer.consumer_tag)
320335
connection = @hare_info.connection
321336
until @consumer.terminated?

0 commit comments

Comments
 (0)