Skip to content

Merge all other contributions #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 70 additions & 16 deletions lib/filewatch/tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ def initialize(opts={})
@watch = FileWatch::Watch.new
@watch.logger = @logger
@sincedb = {}
@sincedb_last_write = 0
@sincedb_last_write = Time.now.to_i
@sincedb_write_pending = true
@sincedb_writing = false
@statcache = {}
@opts = {
:sincedb_write_interval => 10,
:stat_interval => 1,
:discover_interval => 5,
:exclude => [],
:start_new_files_at => :end,
:follow_only_path => false,
:delimiter => "\n"
}.merge(opts)
if [email protected]?(:sincedb_path)
Expand All @@ -53,6 +56,7 @@ def initialize(opts={})
if [email protected]?(:sincedb_path)
raise NoSinceDBPathGiven.new("No HOME or SINCEDB_PATH set in environment. I need one of these set so I can keep track of the files I am following.")
end
@watch.follow_only_path = @opts[:follow_only_path]
@watch.exclude(@opts[:exclude])

_sincedb_open
Expand Down Expand Up @@ -101,6 +105,9 @@ def subscribe(&block)
@files.delete(path)
inode = @statcache.delete(path)
@sincedb.delete(inode)
when :noupdate
@logger.debug? && @logger.debug(":noupdate for #{path}, from @files")
_sincedb_write_if_pending # will check to see if sincedb_write requests are pending
else
@logger.warn("unknown event type #{event} for #{path}")
end
Expand Down Expand Up @@ -129,7 +136,7 @@ def _open_file(path, event)
# and might be spammy.
now = Time.now.to_i
if now - @lastwarn[path] > OPEN_WARN_INTERVAL
@logger.warn("failed to open #{path}: #{$!}")
@logger.warn? && @logger.warn("failed to open #{path}: #{$!}")
@lastwarn[path] = now
else
@logger.debug? && @logger.debug("(warn supressed) failed to open #{path}: #{$!}")
Expand Down Expand Up @@ -164,6 +171,7 @@ def _open_file(path, event)
end
else
@logger.debug? && @logger.debug("#{path}: staying at position 0, no sincedb")
@sincedb[sincedb_record_uid] = 0
end

return true
Expand All @@ -188,20 +196,14 @@ def _read_file(path, &block)
end

if changed
now = Time.now.to_i
delta = now - @sincedb_last_write
if delta >= @opts[:sincedb_write_interval]
@logger.debug? && @logger.debug("writing sincedb (delta since last write = #{delta})")
_sincedb_write
@sincedb_last_write = now
end
_sincedb_write
end
end # def _read_file

public
def sincedb_write(reason=nil)
@logger.debug? && @logger.debug("caller requested sincedb write (#{reason})")
_sincedb_write
_sincedb_write(true) # since this is an external request, force the write
end

private
Expand All @@ -218,6 +220,7 @@ def _sincedb_open
@logger.debug? && @logger.debug("_sincedb_open: reading from #{path}")
db.each do |line|
ino, dev_major, dev_minor, pos = line.split(" ", 4)
inode = [ino, dev_major.to_i, dev_minor.to_i]
sincedb_record_uid = [ino, dev_major.to_i, dev_minor.to_i]
@logger.debug? && @logger.debug("_sincedb_open: setting #{sincedb_record_uid.inspect} to #{pos.to_i}")
@sincedb[sincedb_record_uid] = pos.to_i
Expand All @@ -226,19 +229,70 @@ def _sincedb_open
end # def _sincedb_open

private
def _sincedb_write
def _sincedb_write_if_pending
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"write if pending" can be written in ruby as:

_sincedb_write if @sincedb_write_pending

Why have it as a separate method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because _sincedb_write can be call from external and must write now and not later.


# Check to see if sincedb should be written out since there was a file read after the sincedb flush,
# and during the sincedb_write_interval

if @sincedb_write_pending
_sincedb_write
end
end

private
def _sincedb_write(sincedb_force_write=false)

# This routine will only write out sincedb if enough time has passed based on @sincedb_write_interval
# If it hasn't and we were asked to write, then we are pending a write.

# if we were called with force == true, then we have to write sincedb and bypass a time check
# ie. external caller calling the public sincedb_write method

if(@sincedb_writing)
@logger.warn? && @logger.warn("_sincedb_write already writing")
return
end

@sincedb_writing = true

if (!sincedb_force_write)
now = Time.now.to_i
delta = now - @sincedb_last_write

# we will have to flush out the sincedb file after the interval expires. So, we will try again later.
if delta < @opts[:sincedb_write_interval]
@sincedb_write_pending = true
@sincedb_writing = false
return
end
end

@logger.debug? && @logger.debug("writing sincedb (delta since last write = #{delta})")

path = @opts[:sincedb_path]
if File.device?(path)
IO.write(path, serialize_sincedb, 0)
else
File.atomic_write(path) {|file| file.write(serialize_sincedb) }
begin
if File.device?(path)
IO.write(path, serialize_sincedb, 0)
else
File.atomic_write(path) {|file| file.write(serialize_sincedb) }
end
rescue => e
@logger.warn("_sincedb_write failed: #{tmp}: #{e}")
@sincedb_writing = false
return
end

@sincedb_last_write = now
@sincedb_write_pending = false
@sincedb_writing = false

System.gc()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this GC call?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also,I did not add this below so I assume you are talking to the group

Sent from my iPad

On Aug 1, 2014, at 2:11 AM, Jordan Sissel [email protected] wrote:

In lib/filewatch/tail.rb:

@@ -242,6 +291,12 @@ def _sincedb_write
rescue => e
@logger.warn("_sincedb_write rename/sync failed: #{tmp} -> #{path}: #{e}")
end
+

  •  @sincedb_last_write = now
    
  •  @sincedb_write_pending = false
    
  •  @sincedb_writing = false
    
  •  System.gc()
    
    Why do this GC call?


Reply to this email directly or view it on GitHub.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it to be sure we regulary clean the memory.
I use this gem on logstash and we can have memory problem during mass write in logs.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some context here, I tried this updated tail.rb code in Logstash and it fails with the following error:

image

In order for it to run in Logstash, I have to add the following 2 lines to the beginning of the file so it knows what System is.

require "java"
java_import "java.lang.System"

Certainly, curious if the GC call is required since the existing Logstash filewatch tail.rb doesn't have it.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never found it necessary to invoke GC manually in Logstash, so without knowing more details, I'll remove this before merging.

end # def _sincedb_write

public
def quit
_sincedb_write
@watch.quit
_sincedb_write(true)
end # def quit

private
Expand Down
35 changes: 30 additions & 5 deletions lib/filewatch/watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ def initialize(opts={})
@logger = Logger.new(STDERR)
@logger.level = Logger::INFO
end
@follow_only_path = false
@watching = []
@exclude = []
@files = Hash.new { |h, k| h[k] = Hash.new }
end # def initialize

public
def follow_only_path=(follow_only_path)
@follow_only_path = follow_only_path
end

public
def logger=(logger)
@logger = logger
Expand All @@ -37,17 +43,31 @@ def watch(path)
@watching << path
_discover_file(path, true)
end

return true
end # def watch

public
def inode(path,stat)
if @iswindows
fileId = Winhelper.GetWindowsUniqueFileIdentifier(path)
inode = [fileId, stat.dev_major, stat.dev_minor]
if @follow_only_path
# In cases where files are rsynced to the consuming server, inodes will change when
# updated files overwrite original ones, resulting in inode changes. In order to
# avoid having the sincedb.member check from failing in this scenario, we'll
# construct the inode key using the path which will be 'stable'
#
# Because spaces and carriage returns are valid characters in linux paths, we have
# to take precautions to avoid having them show up in the .sincedb where they would
# derail any parsing that occurs in _sincedb_open. Since NULL (\0) is NOT a
# valid path character in LINUX (one of the few), we'll replace these troublesome
# characters with 'encodings' that won't be encountered in a normal path but will
# be handled properly by __sincedb_open
inode = [path.gsub(/ /, "\0\0").gsub(/\n/, "\0\1"), stat.dev_major, stat.dev_minor]
else
inode = [stat.ino.to_s, stat.dev_major, stat.dev_minor]
if @iswindows
fileId = Winhelper.GetWindowsUniqueFileIdentifier(path)
inode = [fileId, stat.dev_major, stat.dev_minor]
else
inode = [stat.ino.to_s, stat.dev_major, stat.dev_minor]
end
end
return inode
end
Expand Down Expand Up @@ -95,6 +115,11 @@ def each(&block)
elsif stat.size > @files[path][:size]
@logger.debug? && @logger.debug("#{path}: file grew, old size #{@files[path][:size]}, new size #{stat.size}")
yield(:modify, path)
else
# since there is no update, we should pass control back in case the caller needs to do any work
# otherwise, they can ONLY do other work when a file is created or modified
@logger.debug? && @logger.debug("#{path}: nothing to update")
yield(:noupdate, path)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a no-op event doesn't feel like it fits well. filewatch isn't intended to be a ticking event system, seems like sincedb syncing should be done in a separate thread instead of using this method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not agree, because if both thread are not perfectly sync, you can lost or duplicate some data after a stop/start.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are any questions with my update, I can answer then very specifically when I get back to the hotel tonight.

Unfortunately, I was not certain that the git had the latest code because I found differences with the same gem code.  I questioned this months ago but never heard back.   I will help in any way you would like .  Just let me know

Sent from Yahoo Mail on Android

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow that.   Which threads see not on sync...tail and watch?  All this is doing is letting one give up control so it doesn't lock out the caller....otherwise sincedb can become stale.   I have tested this for months without an issue...where prior start and stop constantly were having issues because sincedb was not flushed out.   I don't have the code in front of me because I am away on vacation but in the the release, I also fixed a start stop timing issue that is documented in the update . Maybe this is what you are referring to...but hard to tell from the statement

Sent from Yahoo Mail on Android

end

@files[path][:size] = stat.size
Expand Down