Skip to content

Commit 7ee3df2

Browse files
authored
Merge pull request #15 from JiriKadlcik/#6_refactor_shutdown_sequence
Fix issue #6 - use missing calls from Plugin API - stop, close, stop?
2 parents e26df4d + 75ad3bd commit 7ee3df2

File tree

3 files changed

+31
-26
lines changed

3 files changed

+31
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
- Breaking: Updated plugin to use new Java Event APIs
33
- relax logstash-core-plugin-api constrains
44
- update .travis.yml
5+
- fix issue #6 - use missing calls from Plugin API - stop, close, stop?
56

67
# 2.0.4
78
- Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash

lib/logstash/inputs/jms.rb

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
#
88
# For more information about Jms, see <http://docs.oracle.com/javaee/6/tutorial/doc/bncdq.html>
99
# For more information about the Ruby Gem used, see <http://github.com/reidmorrison/jruby-jms>
10-
# Here is a config example :
10+
# Here is a config example to pull from a queue:
1111
# jms {
1212
# include_header => false
1313
# include_properties => false
1414
# include_body => true
1515
# use_jms_timestamp => false
1616
# interval => 10
17-
# queue_name => "myqueue"
17+
# destination => "myqueue"
18+
# pub-sub => false
1819
# yaml_file => "~/jms.yml"
1920
# yaml_section => "mybroker"
2021
# }
@@ -57,7 +58,7 @@ class LogStash::Inputs::Jms < LogStash::Inputs::Threadable
5758
# This parameter has non influence in the case of a subcribed Topic.
5859
config :interval, :validate => :number, :default => 10
5960

60-
# If pub-sub (topic) style should be used or not.
61+
# If pub-sub (topic) style should be used.
6162
config :pub_sub, :validate => :boolean, :default => false
6263

6364
# Name of the destination queue or topic to use.
@@ -137,7 +138,7 @@ def queue_event(msg, output_queue)
137138
if msg.java_kind_of?(JMS::MapMessage)
138139
event = LogStash::Event.new
139140
msg.data.each do |field, value|
140-
event[field.to_s] = value # TODO(claveau): needs codec.decode or converter.convert ?
141+
event.set(field.to_s, value) # TODO(claveau): needs codec.decode or converter.convert ?
141142
end
142143
elsif msg.java_kind_of?(JMS::TextMessage) || msg.java_kind_of?(JMS::BytesMessage)
143144
@codec.decode(msg.to_s) do |event_message|
@@ -152,7 +153,7 @@ def queue_event(msg, output_queue)
152153

153154
# Here, we can use the JMS Enqueue timestamp as the @timestamp
154155
if @use_jms_timestamp && msg.jms_timestamp
155-
event.timestamp = LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000)
156+
event.set("@timestamp", LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000))
156157
end
157158

158159
if @include_header
@@ -182,20 +183,19 @@ def queue_event(msg, output_queue)
182183
def run_consumer(output_queue)
183184
JMS::Connection.session(@jms_config) do |session|
184185
destination_key = @pub_sub ? :topic_name : :queue_name
185-
while(true)
186+
while !stop?
186187
session.consume(destination_key => @destination, :timeout=>@timeout, :selector => @selector) do |message|
187188
queue_event message, output_queue
189+
break if stop?
188190
end
189191
sleep @interval
190192
end
191193
end
192-
rescue LogStash::ShutdownSignal
193-
# Do nothing, let us quit.
194194
rescue => e
195195
@logger.warn("JMS Consumer died", :exception => e, :backtrace => e.backtrace)
196196
sleep(10)
197-
retry
198-
end # def run
197+
retry unless stop?
198+
end # def run_consumer
199199

200200
# Consume all available messages on the queue through a listener
201201
private
@@ -210,19 +210,15 @@ def run_thread(output_queue)
210210
queue_event message, output_queue
211211
end
212212
connection.start
213-
while(true)
213+
while !stop?
214214
@logger.debug("JMS Thread sleeping ...")
215215
sleep @interval
216216
end
217-
rescue LogStash::ShutdownSignal
218-
connection.close
219217
rescue => e
220218
@logger.warn("JMS Consumer died", :exception => e, :backtrace => e.backtrace)
221219
sleep(10)
222-
retry
223-
end # def run
224-
225-
220+
retry unless stop?
221+
end # def run_thread
226222

227223
# Consume all available messages on the queue through a listener
228224
private
@@ -241,18 +237,16 @@ def run_async(output_queue)
241237
end
242238
# Since the on_message handler above is in a separate thread the thread needs
243239
# to do some other work. It will just sleep for 10 seconds.
244-
while(true)
240+
while !stop?
241+
@logger.debug("JMS Thread sleeping ...")
245242
sleep @interval
246243
end
247244
end
248-
rescue LogStash::ShutdownSignal
249-
# Do nothing, let us quit.
250245
rescue => e
251246
@logger.warn("JMS Consumer died", :exception => e, :backtrace => e.backtrace)
252247
sleep(10)
253-
retry
254-
end # def run
255-
248+
retry unless stop?
249+
end # def run_async
256250

257251
public
258252
def run(output_queue)
@@ -266,4 +260,15 @@ def run(output_queue)
266260
end
267261
end # def run
268262

263+
public
264+
def close
265+
@logger.info("Closing JMS connection")
266+
@connection.close rescue nil
267+
end # def close
268+
269+
public
270+
def stop
271+
@logger.info("Stopping JMS consumer")
272+
@connection.stop rescue nil
273+
end # def stop
269274
end # class LogStash::Inputs::Jms

logstash-input-jms.gemspec

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ Gem::Specification.new do |s|
2121

2222
# Gem dependencies
2323
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
24-
25-
s.add_runtime_dependency 'logstash-codec-json'
26-
s.add_runtime_dependency 'logstash-codec-plain'
24+
s.add_runtime_dependency 'logstash-codec-json', '~> 3.0'
25+
s.add_runtime_dependency 'logstash-codec-plain', '~> 3.0'
2726

2827
if RUBY_PLATFORM == 'java'
2928
s.platform = RUBY_PLATFORM

0 commit comments

Comments
 (0)