Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "logstash/util/socket_peer"
require "logstash-input-tcp_jars"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/port_management_support'

require "socket"
require "openssl"
Expand Down Expand Up @@ -68,6 +69,8 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
# ecs_compatibility option, provided by Logstash core or the support adapter.
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)

include LogStash::PluginMixins::PortManagementSupport

config_name "tcp"

default :codec, "line"
Expand Down Expand Up @@ -177,15 +180,20 @@ def register
validate_ssl_config!

if server?
@loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
@port_reservation = port_management.reserve(addr: @host, port: @port) do |reserved_addr, reserved_port|
@loop = InputLoop.new(@id, reserved_addr, reserved_port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
end
Comment on lines +183 to +185
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to use blocks here since we're not wrapping behavior:

Suggested change
@port_reservation = port_management.reserve(addr: @host, port: @port) do |reserved_addr, reserved_port|
@loop = InputLoop.new(@id, reserved_addr, reserved_port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
end
port_management.reserve(port: @port) # if this succeeds, there is a reservation for the port
@loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)

Also we should set the reservation scope for the port alone. Not sure if it's worth differentiating the addr, we can be conservative here and allow the port to be reserved regardless of the addr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're not wrapping behavior

We are.

PortManagementSupport::Reservation#initialize releases the reservation if an exception is raised by the block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we should set the reservation scope for the port alone. Not sure if it's worth differentiating the addr, we can be conservative here and allow the port to be reserved regardless of the addr.

In order to spawn the server that effectively holds the reservation, we need to know the addr to bind to, so really we are reserving an addr:port pair, not just the port.

end
end

def run(output_queue)
@output_queue = output_queue
if server?
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}", :ssl_enabled => @ssl_enabled)
@loop.run
@port_reservation.convert do |reserved_addr, reserved_port|
@logger.info("Starting tcp input listener", :address => "#{reserved_addr}:#{reserved_port}", :ssl_enabled => @ssl_enabled)
@loop.start
end
@loop.wait_until_closed
Comment on lines +192 to +196
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just do:

Suggested change
@port_reservation.convert do |reserved_addr, reserved_port|
@logger.info("Starting tcp input listener", :address => "#{reserved_addr}:#{reserved_port}", :ssl_enabled => @ssl_enabled)
@loop.start
end
@loop.wait_until_closed
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}", :ssl_enabled => @ssl_enabled)
port_management.release(port: @port) { @loop.run }

Essentially tell the global manager:

  1. ok I'm here.
  2. release the port but don't free the reservation until I'm out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Reservation#convert, the global mutex for creating reservations is held while the block is executed, and we don't want to hold that global lock for longer than necessary because other plugins using this mixin will be unable to create or convert while we hold the global lock.

The point of allowing the caller to execute a block while the global lock is held is to ensure that some other plugin can't use this library to create another reservation in the window between the dummy server being shut down and the caller standing up something to replace it.

If we were to add a second layer of locking (e.g., each reservation having its own mutex), we could do the bind outside of the global lock but we would introduce complexity around the race conditions.

else
run_client()
end
Expand Down
1 change: 1 addition & 0 deletions logstash-input-tcp.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Gem::Specification.new do |s|
# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.2'
s.add_runtime_dependency 'logstash-mixin-port_management_support', '~>1.0'

s.add_runtime_dependency 'logstash-core', '>= 8.1.0'

Expand Down
38 changes: 27 additions & 11 deletions spec/inputs/tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,27 @@
#Cabin::Channel.get(LogStash).level = :debug
describe LogStash::Inputs::Tcp, :ecs_compatibility_support do

def get_port
begin
# Start high to better avoid common services
port = rand(10000..65535)
s = TCPServer.new("127.0.0.1", port)
s.close
return port
rescue Errno::EADDRINUSE
retry
end
##
# yield the block with a port that is available
# @return [Integer]: a port that is available
def find_available_port
with_bound_port(&:itself)
end

##
# Yields block with a port that is unavailable
# @yieldparam port [Integer]
# @yieldreturn [Object]
# @return [Object]
def with_bound_port(port=0, &block)
server = TCPServer.new("::", port)

return yield(server.local_address.ip_port)
ensure
server.close
end

let(:port) { get_port }
let(:port) { find_available_port }

context "codec (PR #1372)" do
it "switches from plain to line" do
Expand Down Expand Up @@ -373,6 +381,14 @@ def get_port
expect { subject.register }.to_not raise_error
end

context "when the port is unavailable" do
it 'raises a helpful exception' do
with_bound_port(port) do |unavailable_port|
expect { subject.register }.to raise_error(Errno::EADDRINUSE)
end
end
end

context "when using ssl" do
let(:config) do
{
Expand Down
30 changes: 25 additions & 5 deletions src/main/java/org/logstash/tcp/InputLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Plain TCP Server Implementation.
*/
public final class InputLoop implements Runnable, Closeable {
public final class InputLoop implements Closeable {

// historically this class was passing around the plugin's logger
private static final Logger logger = LogManager.getLogger("logstash.inputs.tcp");
Expand All @@ -46,6 +46,11 @@ public final class InputLoop implements Runnable, Closeable {
*/
private final ServerBootstrap serverBootstrap;

/**
* The channel after starting
*/
private volatile Channel channel;

/**
* SSL configuration.
*/
Expand Down Expand Up @@ -82,11 +87,26 @@ public InputLoop(final String id, final String host, final int port, final Decod
.childHandler(new InputLoop.InputHandler(decoder, sslContext));
}

@Override
public void run() {
public synchronized void start() {
if (channel != null) {
throw new IllegalStateException("Already started");
}
try {
serverBootstrap.bind(host, port).sync().channel().closeFuture().sync();
} catch (final InterruptedException ex) {
channel = serverBootstrap.bind(host, port).sync().channel();
}catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}

public void waitUntilClosed() {
synchronized (this) {
if (channel == null) {
throw new IllegalStateException("not started");
}
}
try {
channel.closeFuture().sync();
}catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
Expand Down