Skip to content

Commit d8ebe28

Browse files
committed
Add sync mechanism between stop function and consumer thread to avoid race conditions
1 parent db21153 commit d8ebe28

File tree

1 file changed

+19
-2
lines changed

1 file changed

+19
-2
lines changed

lib/logstash/inputs/rabbitmq.rb

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ module Inputs
5959
class RabbitMQ < LogStash::Inputs::Threadable
6060

6161
java_import java.util.concurrent.TimeUnit
62+
java_import java.util.concurrent.CountDownLatch
6263

6364
include ::LogStash::PluginMixins::RabbitMQConnection
6465

@@ -189,6 +190,7 @@ def setup!
189190
connect!
190191
declare_queue!
191192
bind_exchange!
193+
@terminated = CountDownLatch.new(1)
192194
@hare_info.channel.prefetch = @prefetch_count
193195
rescue => e
194196
# when encountering an exception during shut-down,
@@ -268,7 +270,11 @@ def internal_queue_consume!
268270
next
269271
end
270272

271-
break if payload == INTERNAL_QUEUE_POISON
273+
if payload == INTERNAL_QUEUE_POISON
274+
@logger.info("RabbitMQ consumer thread received shutdown signal, exiting")
275+
@terminated.countDown
276+
break
277+
end
272278

273279
metadata, data = payload
274280
@codec.decode(data) do |event|
@@ -302,11 +308,22 @@ def decorate(event, metadata, data)
302308
end
303309

304310
def stop
305-
@internal_queue&.put(INTERNAL_QUEUE_POISON)
311+
stop_consumer_thread
306312
shutdown_consumer
307313
close_connection
308314
end
309315

316+
def stop_consumer_thread
317+
# After sending the poison pill, we need to wait for the consumer thread to actually exit.
318+
# Closing channel right away may lead to messages not being acked properly or a race condition
319+
# where the consumer thread tries to ack a message on a closed channel.
320+
@internal_queue.put(INTERNAL_QUEUE_POISON)
321+
timed_out = !@terminated.await(2, TimeUnit::SECONDS)
322+
if timed_out
323+
@logger.warn("Timeout waiting for RabbitMQ consumer thread to terminate")
324+
end
325+
end
326+
310327
def shutdown_consumer
311328
# There are two possible flows to shutdown consumers. When the plugin is the one shutting down, it should send a channel
312329
# cancellation message by invoking channel.basic_cancel(consumer_tag) and waiting for the consumer to terminate

0 commit comments

Comments
 (0)