diff --git a/.gitignore b/.gitignore index 8eb24fc..979e511 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ *.rbc core *.gem +.idea +Gemfile.lock diff --git a/filewatch.gemspec b/filewatch.gemspec index f4550ec..e1bf382 100644 --- a/filewatch.gemspec +++ b/filewatch.gemspec @@ -20,4 +20,5 @@ Gem::Specification.new do |spec| spec.homepage = "https://github.com/jordansissel/ruby-filewatch" spec.add_development_dependency "stud" + spec.add_runtime_dependency 'rest-client', ['~> 1.8'] end diff --git a/lib/filewatch/observing_tail.rb b/lib/filewatch/observing_tail.rb index 93c8e52..4d12aee 100644 --- a/lib/filewatch/observing_tail.rb +++ b/lib/filewatch/observing_tail.rb @@ -1,4 +1,5 @@ require 'filewatch/tail_base' +require 'rest-client' module FileWatch class ObservingTail @@ -6,17 +7,33 @@ class ObservingTail public class NullListener - def initialize(path) @path = path; end - def accept(line) end - def deleted() end - def created() end - def error() end - def eof() end - def timed_out() end + def initialize(path) + @path = path; + end + + def accept(line) + end + + def deleted() + end + + def created() + end + + def error() + end + + def eof() + end + + def timed_out() + end end class NullObserver - def listener_for(path) NullListener.new(path); end + def listener_for(path) + NullListener.new(path); + end end def subscribe(observer = NullObserver.new) @@ -25,48 +42,101 @@ def subscribe(observer = NullObserver.new) path = watched_file.path file_is_open = watched_file.file_open? listener = observer.listener_for(path) - case event - when :unignore - listener.created - _add_to_sincedb(watched_file, event) unless @sincedb.member?(watched_file.inode) - when :create, :create_initial - if file_is_open - @logger.debug? && @logger.debug("#{event} for #{path}: file already open") - next - end - if _open_file(watched_file, event) - listener.created - observe_read_file(watched_file, listener) - end - when :modify - if file_is_open - observe_read_file(watched_file, listener) + + + # ---------------------------------------------------------------------------------- + # Code modification to validate the file's MD5 Digest against a validation endpoint + # ---------------------------------------------------------------------------------- + # check if an authentication endpoint is provided for the watch object + if !@watch.auth_endpoint.nil? + auth_endpoint = @watch.auth_endpoint + @logger.debug? && @logger.debug("An authentication endpoint was found for file validation: #{auth_endpoint}") + + file_digest = Digest::MD5.file path + md5_hex_digest = file_digest.hexdigest + @logger.debug? && @logger.debug("Checksum MD5: #{md5_hex_digest} for file at path: #{path}") + + url = auth_endpoint + query_string = "?md5="+md5_hex_digest + + # check for other params and append them as query/path parameters accordingly + if !@watch.auth_params.nil? + @watch.auth_params.each { |param| + if param.include? "=" + query_string += ("&"+param) + else + url += param + end + } + + url += query_string + @logger.debug? && @logger.debug("Final validation URL: #{url}") else - @logger.debug? && @logger.debug(":modify for #{path}, file is not open, opening now") - if _open_file(watched_file, event) - observe_read_file(watched_file, listener) - end + url += query_string + @logger.debug? && @logger.debug("No additional params found. Final validation URL: #{url}") end - when :delete - if file_is_open - @logger.debug? && @logger.debug(":delete for #{path}, closing file") - observe_read_file(watched_file, listener) - watched_file.file_close - else - @logger.debug? && @logger.debug(":delete for #{path}, file already closed") + + begin + response = RestClient.get(url) + @logger.debug? && @logger.debug("Response from validation endpoint: #{response.body}") + rescue RestClient::ExceptionWithResponse => err + @logger.warn("An invalid file at path - #{path} has the validation response: #{err.response}") + @logger.debug? && @logger.debug("Response from validation endpoint: #{err.response}") + watched_file.unwatch + end + end + # ---------------------------------------------------------------------------------- + # End of Modifications + # ---------------------------------------------------------------------------------- + # continue processing only if the file status has not been changed to ":unwatched" as + # result of failed validation above. + + if !(watched_file.state == :unwatched) + case event + when :unignore + listener.created + _add_to_sincedb(watched_file, event) unless @sincedb.member?(watched_file.inode) + when :create, :create_initial + if file_is_open + @logger.debug? && @logger.debug("#{event} for #{path}: file already open") + next + end + if _open_file(watched_file, event) + listener.created + observe_read_file(watched_file, listener) + end + when :modify + if file_is_open + observe_read_file(watched_file, listener) + else + @logger.debug? && @logger.debug(":modify for #{path}, file is not open, opening now") + if _open_file(watched_file, event) + observe_read_file(watched_file, listener) + end + end + when :delete + if file_is_open + @logger.debug? && @logger.debug(":delete for #{path}, closing file") + observe_read_file(watched_file, listener) + watched_file.file_close + else + @logger.debug? && @logger.debug(":delete for #{path}, file already closed") + end + listener.deleted + when :timeout + @logger.debug? && @logger.debug(":timeout for #{path}, closing file") + watched_file.file_close + listener.timed_out + else + @logger.warn("unknown event type #{event} for #{path}") end - listener.deleted - when :timeout - @logger.debug? && @logger.debug(":timeout for #{path}, closing file") - watched_file.file_close - listener.timed_out - else - @logger.warn("unknown event type #{event} for #{path}") end end # @watch.subscribe # when watch.subscribe ends - its because we got quit _sincedb_write - end # def subscribe + end + + # def subscribe private def observe_read_file(watched_file, listener) diff --git a/lib/filewatch/tail_base.rb b/lib/filewatch/tail_base.rb index 082009a..ef214af 100644 --- a/lib/filewatch/tail_base.rb +++ b/lib/filewatch/tail_base.rb @@ -57,6 +57,8 @@ def initialize(opts={}) @watch.ignore_older = @opts[:ignore_older] @watch.delimiter = @opts[:delimiter] @watch.max_open_files = @opts[:max_open_files] + @watch.auth_endpoint = @opts[:auth_endpoint] + @watch.auth_params = @opts[:auth_params] @delimiter_byte_size = @opts[:delimiter].bytesize _sincedb_open diff --git a/lib/filewatch/watch.rb b/lib/filewatch/watch.rb index 281309f..d818e2d 100644 --- a/lib/filewatch/watch.rb +++ b/lib/filewatch/watch.rb @@ -34,6 +34,8 @@ def self.inode(path, stat) attr_accessor :logger attr_accessor :delimiter + attr_accessor :auth_endpoint + attr_accessor :auth_params attr_reader :max_active def initialize(opts={})