Skip to content

Commit d64a88f

Browse files
committed
Refactor: reduce locking while appending events
1 parent 94fd9fd commit d64a88f

File tree

1 file changed

+9
-12
lines changed

1 file changed

+9
-12
lines changed

lib/logstash/outputs/kafka.rb

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'logstash/namespace'
22
require 'logstash/outputs/base'
33
require 'java'
4+
require 'concurrent/map'
45
require 'logstash-integration-kafka_jars.rb'
56
require 'logstash/plugin_mixins/kafka_support'
67

@@ -190,7 +191,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
190191

191192
public
192193
def register
193-
@thread_batch_map = Concurrent::Hash.new
194+
@thread_batch_map = Concurrent::Map.new
194195

195196
if !@retries.nil?
196197
if @retries < 0
@@ -204,34 +205,30 @@ def register
204205
@producer = create_producer
205206
if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
206207
@codec.on_event do |event, data|
207-
write_to_kafka(event, data)
208+
push_event_data(event, data)
208209
end
209210
elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
210211
@codec.on_event do |event, data|
211-
write_to_kafka(event, data.to_java_bytes)
212+
push_event_data(event, data.to_java_bytes)
212213
end
213214
else
214215
raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer"
215216
end
216217
end
217218

218-
def prepare(record)
219+
def append_record(record)
219220
# This output is threadsafe, so we need to keep a batch per thread.
220-
@thread_batch_map[Thread.current].add(record)
221+
@thread_batch_map.get(Thread.current) << record
221222
end
222223

223224
def multi_receive(events)
224-
t = Thread.current
225-
if !@thread_batch_map.include?(t)
226-
@thread_batch_map[t] = java.util.ArrayList.new(events.size)
227-
end
225+
batch = @thread_batch_map.fetch_or_store(Thread.current) { Array.new(events.size).clear }
228226

229227
events.each do |event|
230228
break if event == LogStash::SHUTDOWN
231229
@codec.encode(event)
232230
end
233231

234-
batch = @thread_batch_map[t]
235232
if batch.any?
236233
retrying_send(batch)
237234
batch.clear
@@ -315,13 +312,13 @@ def handle_kafka_error(e, record)
315312
end
316313
end
317314

318-
def write_to_kafka(event, serialized_data)
315+
def push_event_data(event, serialized_data)
319316
if @message_key.nil?
320317
record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data)
321318
else
322319
record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)
323320
end
324-
prepare(record)
321+
append_record(record)
325322
rescue LogStash::ShutdownSignal
326323
logger.debug('producer received shutdown signal')
327324
rescue => e

0 commit comments

Comments
 (0)