diff --git a/lib/filewatch/tail.rb b/lib/filewatch/tail.rb index 0fd097f..a77ab19 100644 --- a/lib/filewatch/tail.rb +++ b/lib/filewatch/tail.rb @@ -36,7 +36,9 @@ def initialize(opts={}) @watch = FileWatch::Watch.new @watch.logger = @logger @sincedb = {} - @sincedb_last_write = 0 + @sincedb_last_write = Time.now.to_i + @sincedb_write_pending = true + @sincedb_writing = false @statcache = {} @opts = { :sincedb_write_interval => 10, @@ -44,6 +46,7 @@ def initialize(opts={}) :discover_interval => 5, :exclude => [], :start_new_files_at => :end, + :follow_only_path => false, :delimiter => "\n" }.merge(opts) if !@opts.include?(:sincedb_path) @@ -53,6 +56,7 @@ def initialize(opts={}) if !@opts.include?(:sincedb_path) raise NoSinceDBPathGiven.new("No HOME or SINCEDB_PATH set in environment. I need one of these set so I can keep track of the files I am following.") end + @watch.follow_only_path = @opts[:follow_only_path] @watch.exclude(@opts[:exclude]) _sincedb_open @@ -101,6 +105,9 @@ def subscribe(&block) @files.delete(path) inode = @statcache.delete(path) @sincedb.delete(inode) + when :noupdate + @logger.debug? && @logger.debug(":noupdate for #{path}, from @files") + _sincedb_write_if_pending # will check to see if sincedb_write requests are pending else @logger.warn("unknown event type #{event} for #{path}") end @@ -129,7 +136,7 @@ def _open_file(path, event) # and might be spammy. now = Time.now.to_i if now - @lastwarn[path] > OPEN_WARN_INTERVAL - @logger.warn("failed to open #{path}: #{$!}") + @logger.warn? && @logger.warn("failed to open #{path}: #{$!}") @lastwarn[path] = now else @logger.debug? && @logger.debug("(warn supressed) failed to open #{path}: #{$!}") @@ -164,6 +171,7 @@ def _open_file(path, event) end else @logger.debug? && @logger.debug("#{path}: staying at position 0, no sincedb") + @sincedb[sincedb_record_uid] = 0 end return true @@ -188,20 +196,14 @@ def _read_file(path, &block) end if changed - now = Time.now.to_i - delta = now - @sincedb_last_write - if delta >= @opts[:sincedb_write_interval] - @logger.debug? && @logger.debug("writing sincedb (delta since last write = #{delta})") - _sincedb_write - @sincedb_last_write = now - end + _sincedb_write end end # def _read_file public def sincedb_write(reason=nil) @logger.debug? && @logger.debug("caller requested sincedb write (#{reason})") - _sincedb_write + _sincedb_write(true) # since this is an external request, force the write end private @@ -218,6 +220,7 @@ def _sincedb_open @logger.debug? && @logger.debug("_sincedb_open: reading from #{path}") db.each do |line| ino, dev_major, dev_minor, pos = line.split(" ", 4) + inode = [ino, dev_major.to_i, dev_minor.to_i] sincedb_record_uid = [ino, dev_major.to_i, dev_minor.to_i] @logger.debug? && @logger.debug("_sincedb_open: setting #{sincedb_record_uid.inspect} to #{pos.to_i}") @sincedb[sincedb_record_uid] = pos.to_i @@ -226,19 +229,70 @@ def _sincedb_open end # def _sincedb_open private - def _sincedb_write + def _sincedb_write_if_pending + + # Check to see if sincedb should be written out since there was a file read after the sincedb flush, + # and during the sincedb_write_interval + + if @sincedb_write_pending + _sincedb_write + end + end + + private + def _sincedb_write(sincedb_force_write=false) + + # This routine will only write out sincedb if enough time has passed based on @sincedb_write_interval + # If it hasn't and we were asked to write, then we are pending a write. + + # if we were called with force == true, then we have to write sincedb and bypass a time check + # ie. external caller calling the public sincedb_write method + + if(@sincedb_writing) + @logger.warn? && @logger.warn("_sincedb_write already writing") + return + end + + @sincedb_writing = true + + if (!sincedb_force_write) + now = Time.now.to_i + delta = now - @sincedb_last_write + + # we will have to flush out the sincedb file after the interval expires. So, we will try again later. + if delta < @opts[:sincedb_write_interval] + @sincedb_write_pending = true + @sincedb_writing = false + return + end + end + + @logger.debug? && @logger.debug("writing sincedb (delta since last write = #{delta})") + path = @opts[:sincedb_path] - if File.device?(path) - IO.write(path, serialize_sincedb, 0) - else - File.atomic_write(path) {|file| file.write(serialize_sincedb) } + begin + if File.device?(path) + IO.write(path, serialize_sincedb, 0) + else + File.atomic_write(path) {|file| file.write(serialize_sincedb) } + end + rescue => e + @logger.warn("_sincedb_write failed: #{tmp}: #{e}") + @sincedb_writing = false + return end + + @sincedb_last_write = now + @sincedb_write_pending = false + @sincedb_writing = false + + System.gc() end # def _sincedb_write public def quit - _sincedb_write @watch.quit + _sincedb_write(true) end # def quit private diff --git a/lib/filewatch/watch.rb b/lib/filewatch/watch.rb index 9694eed..3d80889 100644 --- a/lib/filewatch/watch.rb +++ b/lib/filewatch/watch.rb @@ -16,11 +16,17 @@ def initialize(opts={}) @logger = Logger.new(STDERR) @logger.level = Logger::INFO end + @follow_only_path = false @watching = [] @exclude = [] @files = Hash.new { |h, k| h[k] = Hash.new } end # def initialize + public + def follow_only_path=(follow_only_path) + @follow_only_path = follow_only_path + end + public def logger=(logger) @logger = logger @@ -37,17 +43,31 @@ def watch(path) @watching << path _discover_file(path, true) end - return true end # def watch public def inode(path,stat) - if @iswindows - fileId = Winhelper.GetWindowsUniqueFileIdentifier(path) - inode = [fileId, stat.dev_major, stat.dev_minor] + if @follow_only_path + # In cases where files are rsynced to the consuming server, inodes will change when + # updated files overwrite original ones, resulting in inode changes. In order to + # avoid having the sincedb.member check from failing in this scenario, we'll + # construct the inode key using the path which will be 'stable' + # + # Because spaces and carriage returns are valid characters in linux paths, we have + # to take precautions to avoid having them show up in the .sincedb where they would + # derail any parsing that occurs in _sincedb_open. Since NULL (\0) is NOT a + # valid path character in LINUX (one of the few), we'll replace these troublesome + # characters with 'encodings' that won't be encountered in a normal path but will + # be handled properly by __sincedb_open + inode = [path.gsub(/ /, "\0\0").gsub(/\n/, "\0\1"), stat.dev_major, stat.dev_minor] else - inode = [stat.ino.to_s, stat.dev_major, stat.dev_minor] + if @iswindows + fileId = Winhelper.GetWindowsUniqueFileIdentifier(path) + inode = [fileId, stat.dev_major, stat.dev_minor] + else + inode = [stat.ino.to_s, stat.dev_major, stat.dev_minor] + end end return inode end @@ -95,6 +115,11 @@ def each(&block) elsif stat.size > @files[path][:size] @logger.debug? && @logger.debug("#{path}: file grew, old size #{@files[path][:size]}, new size #{stat.size}") yield(:modify, path) + else + # since there is no update, we should pass control back in case the caller needs to do any work + # otherwise, they can ONLY do other work when a file is created or modified + @logger.debug? && @logger.debug("#{path}: nothing to update") + yield(:noupdate, path) end @files[path][:size] = stat.size