diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index bbdf387..5952918 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -13,6 +13,7 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base config_name "tcp" + concurrency :shared default :codec, "json" @@ -115,7 +116,7 @@ def register end # @ssl_enable if server? - workers_not_supported + @server_mutex = Mutex.new @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") begin @@ -144,35 +145,6 @@ def register end end end - - @codec.on_event do |event, payload| - @client_threads.each do |client_thread| - client_thread[:client].write(payload) - end - @client_threads.reject! {|t| !t.alive? } - end - else - client_socket = nil - @codec.on_event do |event, payload| - begin - client_socket = connect unless client_socket - r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) - # don't expect any reads, but a readable socket might - # mean the remote end closed, so read it and throw it away. - # we'll get an EOFError if it happens. - client_socket.sysread(16384) if r.any? - - # Now send the payload - client_socket.syswrite(payload) if w.any? - rescue => e - @logger.warn("tcp output exception", :host => @host, :port => @port, - :exception => e, :backtrace => e.backtrace) - client_socket.close rescue nil - client_socket = nil - sleep @reconnect_interval - retry - end - end end end # def register @@ -204,7 +176,38 @@ def server? end # def server? public - def receive(event) - @codec.encode(event) - end # def receive + def multi_receive_encoded(encoded) + if server? + @server_mutex.synchronize do + @client_threads.each do |client_thread| + encoded.each do |event,data| + client_thread[:client].write(data) + end + end + @client_threads.reject! {|t| !t.alive? } + end + else + client_socket = nil + begin + client_socket = connect unless client_socket + r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) + # don't expect any reads, but a readable socket might + # mean the remote end closed, so read it and throw it away. + # we'll get an EOFError if it happens. + client_socket.sysread(16384) if r.any? + + # Now send the payload + encoded.each do |event,data| + client_socket.syswrite(data) if w.any? + end + rescue => e + @logger.warn("tcp output exception", :host => @host, :port => @port, + :exception => e, :backtrace => e.backtrace) + client_socket.close rescue nil + client_socket = nil + sleep @reconnect_interval + retry + end + end + end end # class LogStash::Outputs::Tcp