Skip to content

Commit d15551d

Browse files
committed
test send ack if channel is open
1 parent db21153 commit d15551d

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

lib/logstash/inputs/rabbitmq.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ module Inputs
5959
class RabbitMQ < LogStash::Inputs::Threadable
6060

6161
java_import java.util.concurrent.TimeUnit
62+
java_import java.util.concurrent.CountDownLatch
6263

6364
include ::LogStash::PluginMixins::RabbitMQConnection
6465

@@ -189,6 +190,7 @@ def setup!
189190
connect!
190191
declare_queue!
191192
bind_exchange!
193+
@terminated = CountDownLatch.new(1)
192194
@hare_info.channel.prefetch = @prefetch_count
193195
rescue => e
194196
# when encountering an exception during shut-down,
@@ -260,15 +262,20 @@ def internal_queue_consume!
260262
while true
261263
payload = @internal_queue.poll(10, TimeUnit::MILLISECONDS)
262264
if !payload # Nothing in the queue
263-
if last_delivery_tag # And we have unacked stuff
265+
puts "payload null, checking delivery tag"
266+
if last_delivery_tag # And we have unacked stuff
267+
puts"Sending ack for last_delivery_tag #{last_delivery_tag}"
264268
@hare_info.channel.ack(last_delivery_tag, true) if @ack
265269
i=0
266270
last_delivery_tag = nil
267271
end
268272
next
269273
end
270-
271-
break if payload == INTERNAL_QUEUE_POISON
274+
if payload == INTERNAL_QUEUE_POISON
275+
puts "Received poison pill, shutting down internal queue consumer"
276+
@terminated.countDown
277+
break
278+
end
272279

273280
metadata, data = payload
274281
@codec.decode(data) do |event|
@@ -302,7 +309,12 @@ def decorate(event, metadata, data)
302309
end
303310

304311
def stop
305-
@internal_queue&.put(INTERNAL_QUEUE_POISON)
312+
puts "Sending Poison pill"
313+
@internal_queue.put(INTERNAL_QUEUE_POISON)
314+
timed_out = !@terminated.await(2, TimeUnit::SECONDS)
315+
if timed_out
316+
@logger.warn("Timeout waiting for RabbitMQ consumer to terminate")
317+
end
306318
shutdown_consumer
307319
close_connection
308320
end
@@ -315,7 +327,7 @@ def shutdown_consumer
315327
# internally by the client, unregistering the consumer and then invoking the :on_cancellation callback. In that case, the plugin
316328
# should not do anything as the consumer is already cancelled/unregistered.
317329
return if !@consumer || @consumer.cancelled? || @consumer.terminated?
318-
330+
puts "shutting down consumer"
319331
@hare_info.channel.basic_cancel(@consumer.consumer_tag)
320332
connection = @hare_info.connection
321333
until @consumer.terminated?

lib/logstash/plugin_mixins/rabbitmq_connection.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def conn_str
8686

8787
def close_connection
8888
@rabbitmq_connection_stopping = true
89+
puts "closing channel"
8990
@hare_info.channel.close if channel_open?
9091
@hare_info.connection.close if connection_open?
9192
end

0 commit comments

Comments
 (0)