Skip to content

Merge all other contributions #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 73 additions & 18 deletions lib/filewatch/tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ def initialize(opts={})
@watch = FileWatch::Watch.new
@watch.logger = @logger
@sincedb = {}
@sincedb_last_write = 0
@sincedb_last_write = Time.now.to_i
@sincedb_write_pending = false
@sincedb_writing = false
@statcache = {}
@opts = {
:sincedb_write_interval => 10,
:stat_interval => 1,
:discover_interval => 5,
:exclude => [],
:start_new_files_at => :end
:start_new_files_at => :end,
:follow_only_path => false
}.merge(opts)
if [email protected]?(:sincedb_path)
@opts[:sincedb_path] = File.join(ENV["HOME"], ".sincedb") if ENV.include?("HOME")
Expand Down Expand Up @@ -96,6 +99,9 @@ def subscribe(&block)
@files[path].close
@files.delete(path)
@statcache.delete(path)
when :noupdate
@logger.debug(":noupdate for #{path}, from @files")
_sincedb_write_if_pending # will check to see if sincedb_write requests are pending
else
@logger.warn("unknown event type #{event} for #{path}")
end
Expand Down Expand Up @@ -128,13 +134,21 @@ def _open_file(path, event)

stat = File::Stat.new(path)

if @iswindows
fileId = Winhelper.GetWindowsUniqueFileIdentifier(path)
inode = [fileId, stat.dev_major, stat.dev_minor]
if @opts[:follow_only_path]
# In cases where files are rsynced to the consuming server, inodes will change when
# updated files overwrite original ones, resulting in inode changes. In order to
# avoid having the sincedb.member check from failing in this scenario, we'll
# construct the inode key using the path which will be 'stable'
inode = [path, stat.dev_major, stat.dev_minor]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated my PR with this change to better handle some uncommon (but legal) path characters in linux:

michio-nikaido@0e3e6c2

I don't see it here, so I wanted to point that out because an embedded space would otherwise throw off the line parsing for sincedb

else
inode = [stat.ino.to_s, stat.dev_major, stat.dev_minor]
if @iswindows
fileId = Winhelper.GetWindowsUniqueFileIdentifier(path)
inode = [fileId, stat.dev_major, stat.dev_minor]
else
inode = [stat.ino.to_s, stat.dev_major, stat.dev_minor]
end

end

@statcache[path] = inode

if @sincedb.member?(inode)
Expand Down Expand Up @@ -186,20 +200,14 @@ def _read_file(path, &block)
end

if changed
now = Time.now.to_i
delta = now - @sincedb_last_write
if delta >= @opts[:sincedb_write_interval]
@logger.debug("writing sincedb (delta since last write = #{delta})")
_sincedb_write
@sincedb_last_write = now
end
_sincedb_write
end
end # def _read_file

public
def sincedb_write(reason=nil)
@logger.debug("caller requested sincedb write (#{reason})")
_sincedb_write
_sincedb_write(true) # since this is an external request, force the write
end

private
Expand All @@ -214,26 +222,67 @@ def _sincedb_open

@logger.debug("_sincedb_open: reading from #{path}")
db.each do |line|
ino, dev_major, dev_minor, pos = line.split(" ", 4)
ino, dev_major, dev_minor, pos = line.split("*", 4)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a backwards incompatible change and we cannot do this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but we need to find a way to support space in path.
It's a must have feature on windows.

inode = [ino, dev_major.to_i, dev_minor.to_i]
@logger.debug("_sincedb_open: setting #{inode.inspect} to #{pos.to_i}")
@sincedb[inode] = pos.to_i
end
db.close
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch; though this would be automatically closed via GC (not tha we should rely on it).

However, this should be closed in an ensure statement. I"ll fix this in the merge process

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In real life, I found some time the GC doesn't close it and get an error on next call (can't open two time the file).

end # def _sincedb_open

private
def _sincedb_write
def _sincedb_write_if_pending
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"write if pending" can be written in ruby as:

_sincedb_write if @sincedb_write_pending

Why have it as a separate method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because _sincedb_write can be call from external and must write now and not later.


# Check to see if sincedb should be written out since there was a file read after the sincedb flush,
# and during the sincedb_write_interval

if @sincedb_write_pending
_sincedb_write
end
end

private
def _sincedb_write(sincedb_force_write=false)

# This routine will only write out sincedb if enough time has passed based on @sincedb_write_interval
# If it hasn't and we were asked to write, then we are pending a write.

# if we were called with force == true, then we have to write sincedb and bypass a time check
# ie. external caller calling the public sincedb_write method

if(@sincedb_writing)
@logger.warn("_sincedb_write already writing")
return
end

@sincedb_writing = true

if (!sincedb_force_write)
now = Time.now.to_i
delta = now - @sincedb_last_write

# we will have to flush out the sincedb file after the interval expires. So, we will try again later.
if delta < @opts[:sincedb_write_interval]
@sincedb_write_pending = true
@sincedb_writing = false
return
end
end

@logger.debug("writing sincedb (delta since last write = #{delta})")

path = @opts[:sincedb_path]
tmp = "#{path}.new"
begin
db = File.open(tmp, "w")
rescue => e
@logger.warn("_sincedb_write failed: #{tmp}: #{e}")
@sincedb_writing = false
return
end

@sincedb.each do |inode, pos|
db.puts([inode, pos].flatten.join(" "))
db.puts([inode, pos].flatten.join("*"))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a backwards-incompatible change and we cannot do this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but we need to find a way to support space in path.
It's a must have feature on windows.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is related, but if the delimiter change was introduced because of my PR (#34) I updated the PR with a crude encoding that allows spaces to be in the paths (see michio-nikaido@0e3e6c2)

end
db.close

Expand All @@ -242,6 +291,12 @@ def _sincedb_write
rescue => e
@logger.warn("_sincedb_write rename/sync failed: #{tmp} -> #{path}: #{e}")
end

@sincedb_last_write = now
@sincedb_write_pending = false
@sincedb_writing = false

System.gc()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this GC call?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also,I did not add this below so I assume you are talking to the group

Sent from my iPad

On Aug 1, 2014, at 2:11 AM, Jordan Sissel [email protected] wrote:

In lib/filewatch/tail.rb:

@@ -242,6 +291,12 @@ def _sincedb_write
rescue => e
@logger.warn("_sincedb_write rename/sync failed: #{tmp} -> #{path}: #{e}")
end
+

  •  @sincedb_last_write = now
    
  •  @sincedb_write_pending = false
    
  •  @sincedb_writing = false
    
  •  System.gc()
    
    Why do this GC call?


Reply to this email directly or view it on GitHub.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it to be sure we regulary clean the memory.
I use this gem on logstash and we can have memory problem during mass write in logs.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some context here, I tried this updated tail.rb code in Logstash and it fails with the following error:

image

In order for it to run in Logstash, I have to add the following 2 lines to the beginning of the file so it knows what System is.

require "java"
java_import "java.lang.System"

Certainly, curious if the GC call is required since the existing Logstash filewatch tail.rb doesn't have it.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never found it necessary to invoke GC manually in Logstash, so without knowing more details, I'll remove this before merging.

end # def _sincedb_write

public
Expand Down
13 changes: 9 additions & 4 deletions lib/filewatch/watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def each(&block)
else
inode = [stat.ino.to_s, stat.dev_major, stat.dev_minor]
end

if inode != @files[path][:inode]
@logger.debug("#{path}: old inode was #{@files[path][:inode].inspect}, new is #{inode.inspect}")
yield(:delete, path)
Expand All @@ -90,6 +90,11 @@ def each(&block)
elsif stat.size > @files[path][:size]
@logger.debug("#{path}: file grew, old size #{@files[path][:size]}, new size #{stat.size}")
yield(:modify, path)
else
# since there is no update, we should pass control back in case the caller needs to do any work
# otherwise, they can ONLY do other work when a file is created or modified
@logger.debug("#{path}: nothing to update")
yield(:noupdate, path)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a no-op event doesn't feel like it fits well. filewatch isn't intended to be a ticking event system, seems like sincedb syncing should be done in a separate thread instead of using this method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not agree, because if both thread are not perfectly sync, you can lost or duplicate some data after a stop/start.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are any questions with my update, I can answer then very specifically when I get back to the hotel tonight.

Unfortunately, I was not certain that the git had the latest code because I found differences with the same gem code.  I questioned this months ago but never heard back.   I will help in any way you would like .  Just let me know

Sent from Yahoo Mail on Android

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow that.   Which threads see not on sync...tail and watch?  All this is doing is letting one give up control so it doesn't lock out the caller....otherwise sincedb can become stale.   I have tested this for months without an issue...where prior start and stop constantly were having issues because sincedb was not flushed out.   I don't have the code in front of me because I am away on vacation but in the the release, I also fixed a start stop timing issue that is documented in the update . Maybe this is what you are referring to...but hard to tell from the statement

Sent from Yahoo Mail on Android

end

@files[path][:size] = stat.size
Expand Down Expand Up @@ -152,14 +157,14 @@ def _discover_file(path, initial=false)
:inode => [stat.ino, stat.dev_major, stat.dev_minor],
:create_sent => false,
}
if @iswindows

if @iswindows
fileId = Winhelper.GetWindowsUniqueFileIdentifier(path)
@files[file][:inode] = [fileId, stat.dev_major, stat.dev_minor]
else
@files[file][:inode] = [stat.ino.to_s, stat.dev_major, stat.dev_minor]
end

if initial
@files[file][:initial] = true
end
Expand Down