Skip to content

Commit 6fe9ff7

Browse files
author
Nitin Goel
committed
Incorporating the review comments for the dynamic prefix based s3 outputs
1 parent 6519235 commit 6fe9ff7

File tree

1 file changed

+13
-19
lines changed
  • lib/logstash/outputs

1 file changed

+13
-19
lines changed

lib/logstash/outputs/s3.rb

+13-19
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
require "thread"
99
require "tmpdir"
1010
require "fileutils"
11-
require 'pathname'
11+
require "pathname"
1212

1313

1414
# INFORMATION:
@@ -60,7 +60,7 @@
6060
# size_file => 2048 (optional)
6161
# time_file => 5 (optional)
6262
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
63-
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that)
63+
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no events for the prefix, before cleaning up the watch on that)
6464
# }
6565
#
6666
class LogStash::Outputs::S3 < LogStash::Outputs::Base
@@ -144,8 +144,8 @@ def write_on_bucket(file)
144144
# find and use the bucket
145145
bucket = @s3.buckets[@bucket]
146146

147-
first = Pathname.new @temporary_directory
148-
second = Pathname.new file
147+
first = Pathname.new(@temporary_directory)
148+
second = Pathname.new(file)
149149

150150
remote_filename_path = second.relative_path_from first
151151

@@ -176,11 +176,9 @@ def create_temporary_file(prefix)
176176
@tempfile[prefix].close
177177
end
178178

179-
if @prefixes.include? prefix
179+
if @prefixes.include?(prefix)
180180
dirname = File.dirname(filename)
181-
unless File.directory?(dirname)
182-
FileUtils.mkdir_p(dirname)
183-
end
181+
FileUtils.mkdir_p(dirname) unless File.directory?(dirname)
184182
@logger.debug("S3: Creating a new temporary file", :filename => filename)
185183
@tempfile[prefix] = File.open(filename, "a")
186184
end
@@ -215,8 +213,6 @@ def register
215213

216214
test_s3_write
217215
restore_from_crashes if @restore == true
218-
#reset_page_counter
219-
#create_temporary_file
220216
configure_periodic_rotation if time_file != 0
221217
configure_upload_workers
222218

@@ -259,7 +255,7 @@ def restore_from_crashes
259255
end
260256

261257
public
262-
def shouldcleanup(prefix)
258+
def need_cleanup?(prefix)
263259
return @empty_uploads[prefix] > @no_event_wait
264260
end
265261

@@ -270,8 +266,7 @@ def move_file_to_bucket(file)
270266

271267
basepath = Pathname.new @temporary_directory
272268
dirname = Pathname.new File.dirname(file)
273-
prefixpath = dirname.relative_path_from basepath
274-
prefix = prefixpath.to_s
269+
prefix = dirname.relative_path_from(basepath).to_s
275270
@logger.debug("S3: moving the file for prefix", :prefix => prefix)
276271

277272
if !File.zero?(file)
@@ -297,9 +292,8 @@ def move_file_to_bucket(file)
297292
@logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
298293
end
299294

300-
if shouldcleanup(prefix)
301-
cleanprefix(prefix)
302-
end
295+
clean_prefix(prefix) if need_cleanup?(prefix)
296+
303297
end
304298

305299
public
@@ -358,7 +352,7 @@ def close()
358352
shutdown_upload_workers
359353
@periodic_rotation_thread.stop! if @periodic_rotation_thread
360354

361-
for prefix in @prefixes
355+
@prefixes.each do |prefix|
362356
@file_rotation_lock[prefix].synchronize do
363357
@tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed?
364358
end
@@ -374,7 +368,7 @@ def shutdown_upload_workers
374368
private
375369
def handle_event(encoded_event, event)
376370
actualprefix = event.sprintf(@prefix)
377-
if not @prefixes.to_a().include? actualprefix
371+
if !@prefixes.include? actualprefix
378372
@file_rotation_lock[actualprefix] = Mutex.new
379373
@prefixes.add(actualprefix)
380374
reset_page_counter(actualprefix)
@@ -416,7 +410,7 @@ def configure_periodic_rotation
416410
end
417411

418412
private
419-
def cleanprefix(prefix)
413+
def clean_prefix(prefix)
420414
path = File.join(@temporary_directory, prefix)
421415
@logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix)
422416
@file_rotation_lock[prefix].synchronize do

0 commit comments

Comments
 (0)