Skip to content

Commit 6a9a667

Browse files
author
Nitin Goel
committed
Incorporating the review comments for the dynamic prefix based s3 outputs
1 parent 1274afa commit 6a9a667

File tree

1 file changed

+13
-19
lines changed
  • lib/logstash/outputs

1 file changed

+13
-19
lines changed

lib/logstash/outputs/s3.rb

+13-19
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
require "thread"
88
require "tmpdir"
99
require "fileutils"
10-
require 'pathname'
10+
require "pathname"
1111

1212

1313
# INFORMATION:
@@ -61,7 +61,7 @@
6161
# time_file => 5 (optional)
6262
# format => "plain" (optional)
6363
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
64-
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that)
64+
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no events for the prefix, before cleaning up the watch on that)
6565
# }
6666
#
6767
class LogStash::Outputs::S3 < LogStash::Outputs::Base
@@ -145,8 +145,8 @@ def write_on_bucket(file)
145145
# find and use the bucket
146146
bucket = @s3.buckets[@bucket]
147147

148-
first = Pathname.new @temporary_directory
149-
second = Pathname.new file
148+
first = Pathname.new(@temporary_directory)
149+
second = Pathname.new(file)
150150

151151
remote_filename_path = second.relative_path_from first
152152

@@ -177,11 +177,9 @@ def create_temporary_file(prefix)
177177
@tempfile[prefix].close
178178
end
179179

180-
if @prefixes.include? prefix
180+
if @prefixes.include?(prefix)
181181
dirname = File.dirname(filename)
182-
unless File.directory?(dirname)
183-
FileUtils.mkdir_p(dirname)
184-
end
182+
FileUtils.mkdir_p(dirname) unless File.directory?(dirname)
185183
@logger.debug("S3: Creating a new temporary file", :filename => filename)
186184
@tempfile[prefix] = File.open(filename, "a")
187185
end
@@ -216,8 +214,6 @@ def register
216214

217215
test_s3_write
218216
restore_from_crashes if @restore == true
219-
#reset_page_counter
220-
#create_temporary_file
221217
configure_periodic_rotation if time_file != 0
222218
configure_upload_workers
223219

@@ -260,7 +256,7 @@ def restore_from_crashes
260256
end
261257

262258
public
263-
def shouldcleanup(prefix)
259+
def need_cleanup?(prefix)
264260
return @empty_uploads[prefix] > @no_event_wait
265261
end
266262

@@ -271,8 +267,7 @@ def move_file_to_bucket(file)
271267

272268
basepath = Pathname.new @temporary_directory
273269
dirname = Pathname.new File.dirname(file)
274-
prefixpath = dirname.relative_path_from basepath
275-
prefix = prefixpath.to_s
270+
prefix = dirname.relative_path_from(basepath).to_s
276271
@logger.debug("S3: moving the file for prefix", :prefix => prefix)
277272

278273
if !File.zero?(file)
@@ -298,9 +293,8 @@ def move_file_to_bucket(file)
298293
@logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
299294
end
300295

301-
if shouldcleanup(prefix)
302-
cleanprefix(prefix)
303-
end
296+
clean_prefix(prefix) if need_cleanup?(prefix)
297+
304298
end
305299

306300
public
@@ -359,7 +353,7 @@ def teardown()
359353
shutdown_upload_workers
360354
@periodic_rotation_thread.stop! if @periodic_rotation_thread
361355

362-
for prefix in @prefixes
356+
@prefixes.each do |prefix|
363357
@file_rotation_lock[prefix].synchronize do
364358
@tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed?
365359
end
@@ -377,7 +371,7 @@ def shutdown_upload_workers
377371
private
378372
def handle_event(encoded_event, event)
379373
actualprefix = event.sprintf(@prefix)
380-
if not @prefixes.to_a().include? actualprefix
374+
if !@prefixes.include? actualprefix
381375
@file_rotation_lock[actualprefix] = Mutex.new
382376
@prefixes.add(actualprefix)
383377
reset_page_counter(actualprefix)
@@ -419,7 +413,7 @@ def configure_periodic_rotation
419413
end
420414

421415
private
422-
def cleanprefix(prefix)
416+
def clean_prefix(prefix)
423417
path = File.join(@temporary_directory, prefix)
424418
@logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix)
425419
@file_rotation_lock[prefix].synchronize do

0 commit comments

Comments
 (0)