Skip to content

Commit 3945d4d

Browse files
committed
feat: make Manager compatible with being initially disconnected
So far only the class supports it. The rest of Syskit won't work (yet) when it happens
1 parent fa4ba97 commit 3945d4d

File tree

3 files changed

+249
-31
lines changed

3 files changed

+249
-31
lines changed

lib/syskit/process_managers/remote/manager.rb

+98-26
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class StartupFailed < RuntimeError; end
4949
# The hostname we are connected to
5050
attr_reader :host
5151
# The port on which we are connected on +hostname+
52-
attr_reader :port
52+
attr_accessor :port
5353
# The PID of the server process
5454
attr_reader :server_pid
5555
# A string that allows to uniquely identify this process server
@@ -77,40 +77,112 @@ def available?
7777
# root for this client's loader
7878
def initialize(
7979
host = "localhost", port = DEFAULT_PORT,
80-
connect_timeout: 10,
81-
response_timeout: 10,
80+
initial_connection_timeout:
81+
Syskit.conf.remote_process_managers_initial_connection_timeout,
82+
connection_timeout:
83+
Syskit.conf.remote_process_managers_connection_timeout,
84+
response_timeout:
85+
Syskit.conf.remote_process_managers_response_timeout,
8286
root_loader: Orocos.default_loader,
83-
register_on_name_server: true
87+
register_on_name_server: true,
88+
connect_executor: :io
8489
)
8590
@host = host
8691
@port = port
8792
@state = STATE_DISCONNECTED
8893
@response_timeout = response_timeout
89-
@socket =
90-
begin Socket.tcp(host, port, connect_timeout: connect_timeout)
91-
rescue Errno::ECONNREFUSED => e
92-
raise e.class,
93-
"cannot contact process server at " \
94-
"'#{host}:#{port}': #{e.message}"
95-
end
96-
97-
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
98-
@socket.fcntl(Fcntl::FD_CLOEXEC, 1)
99-
@state = STATE_CONNECTED
100-
101-
begin
102-
@server_pid = pid
103-
rescue EOFError
104-
close
105-
raise StartupFailed, "process server failed at '#{host}:#{port}'"
106-
end
10794

108-
@loader = Loader.new(self, root_loader)
109-
@root_loader = loader.root_loader
11095
@processes = {}
11196
@death_queue = []
11297
@host_id = "#{host}:#{port}:#{server_pid}"
11398
@register_on_name_server = register_on_name_server
99+
@root_loader = root_loader
100+
101+
@connection_timeout = connection_timeout
102+
@connect_executor = connect_executor
103+
104+
# For now, make the first connection attempt
105+
perform_initial_connection(
106+
deadline: Roby.monotonic_time + initial_connection_timeout
107+
)
108+
109+
if !Syskit.conf.remote_process_managers_accept_failed_connections? &&
110+
!available?
111+
raise ComError,
112+
"connection to #{self} failed and " \
113+
"remote_process_managers_accept_failed_connections is false"
114+
end
115+
end
116+
117+
def perform_initial_connection(deadline:)
118+
while deadline > Roby.monotonic_time
119+
attempt_connection.result(@connection_timeout + @response_timeout)
120+
poll
121+
break if available?
122+
123+
sleep 0.1
124+
end
125+
end
126+
127+
def connect
128+
socket = Socket.tcp(
129+
host, port, connect_timeout: @connection_timeout
130+
)
131+
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
132+
socket.fcntl(Fcntl::FD_CLOEXEC, 1)
133+
socket
134+
end
135+
136+
def poll
137+
case @state
138+
when STATE_DISCONNECTED
139+
poll_in_disconnected_state
140+
end
141+
end
142+
143+
def poll_in_disconnected_state
144+
if @connect_future
145+
return unless (result = @connect_future.result(0))
146+
147+
@connect_future = nil
148+
_, socket, error = result
149+
return handle_new_connection(socket) if socket
150+
151+
ProcessManagers.warn(
152+
"failed to connect to remote process manager #{self}: " \
153+
"#{error.message}"
154+
)
155+
schedule_connection_attempt
156+
elsif Roby.monotonic_time > @next_connection_deadline
157+
attempt_connection
158+
end
159+
end
160+
161+
def schedule_connection_attempt
162+
@next_connection_deadline = Roby.monotonic_time
163+
end
164+
165+
def attempt_connection
166+
@connect_future = Concurrent::Promises.future_on(@connect_executor) do
167+
connect
168+
end
169+
end
170+
171+
def handle_new_connection(socket)
172+
@socket = socket
173+
@state = STATE_CONNECTED
174+
175+
@server_pid = pid
176+
@loader = Loader.new(self, @root_loader)
177+
ProcessManagers.info "connected to remote process manager #{self}"
178+
rescue StandardError => e
179+
ProcessManagers.warn(
180+
"got a socket to remote process manager #{self}, but the first " \
181+
"call failed: #{e.message}"
182+
)
183+
184+
close
185+
schedule_connection_attempt
114186
end
115187

116188
def pid
@@ -346,8 +418,8 @@ def read_object(deadline:)
346418
Marshal.load(@socket)
347419
end
348420

349-
class ComError < RuntimeError
350-
end
421+
class TimeoutError < RuntimeError; end
422+
class ComError < RuntimeError; end
351423

352424
def wait_for_answer(deadline: Roby.monotonic + timeout)
353425
validate_available

lib/syskit/roby_app/configuration.rb

+56-5
Original file line numberDiff line numberDiff line change
@@ -134,20 +134,65 @@ def early_deploy?
134134
@early_deploy
135135
end
136136

137-
# Whether to capture errors during network resolution instead of raising them.
138-
def capture_errors_during_network_resolution?
139-
@capture_errors_during_network_resolution
140-
end
141-
142137
# Controls where the deployment stage happens
143138
#
144139
# @see early_deploy?
145140
attr_writer :early_deploy
146141

142+
# Whether to capture errors during network resolution instead of raising them.
143+
def capture_errors_during_network_resolution?
144+
@capture_errors_during_network_resolution
145+
end
146+
147147
# Controls whether to capture errors instead of raising them during network
148148
# resolution
149149
attr_writer :capture_errors_during_network_resolution
150150

151+
# Whether the initial connection to process servers are allowed to fail
152+
#
153+
# When this flag is enabled, the initial connection to remote process managers
154+
# is allowed to fail. When it happens, Syskit will generate errors for
155+
# attempts to deploy things that run on these process managers.
156+
#
157+
# By default, Syskit will re-try to connect for a full minute before it
158+
# assumes the process manager is not available, to keep a "good" behaviour
159+
# w.r.t. boot of distributed systems.
160+
#
161+
# It is strongly recommended to turn this on with `early_deploy` and
162+
# `capture_errors_during_network_resolution`
163+
def remote_process_managers_accept_failed_connections?
164+
@remote_process_managers_accept_failed_connections
165+
end
166+
167+
# Sets whether syskit accepts the initial connection to process servers are
168+
# allowed to fail
169+
#
170+
# @see #remote_process_managers_accept_failed_connections?
171+
attr_writer :remote_process_managers_accept_failed_connections
172+
173+
# Period at which the remote process managers will retry connecting
174+
# to the remote servers
175+
attr_accessor :remote_process_managers_connection_retry_period
176+
177+
# Timeout for connection to remote process managers
178+
attr_accessor :remote_process_managers_connection_timeout
179+
180+
# Timeout for replies from remote process managers
181+
attr_accessor :remote_process_managers_response_timeout
182+
183+
# Timeout for the initial connection to a process manager
184+
#
185+
# When the manager is instanciated for the first time, Syskit will wait
186+
# for its initial connection attempt before moving on with the rest
187+
# of the initialization.
188+
#
189+
# During this initial connection phase, Syskit will try to connect during
190+
# this many seconds. Each attempt use
191+
# {#remote_process_managers_connection_timeout}
192+
# and {#remote_process_managers_response_timeout}, and retries are
193+
# spaced by {#remote_process_managers_connection_retry_period}
194+
attr_accessor :remote_process_managers_initial_connection_timeout
195+
151196
# Controls whether the orogen types should be exported as Ruby
152197
# constants
153198
#
@@ -184,6 +229,12 @@ def initialize(app)
184229
@early_deploy = false
185230
@capture_errors_during_network_resolution = false
186231

232+
@remote_process_managers_accept_failed_connections = false
233+
@remote_process_managers_connection_retry_period = 5
234+
@remote_process_managers_connection_timeout = 10
235+
@remote_process_managers_response_timeout = 10
236+
@remote_process_managers_initial_connection_timeout = 60
237+
187238
@log_rotation_period = nil
188239
@log_transfer = LogTransferManager::Configuration.new(
189240
user: "syskit",

test/process_managers/test_remote.rb

+95
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,86 @@
5353
end
5454
end
5555

56+
describe "when not accepting failed connections" do
57+
before do
58+
update_and_restore_attr(
59+
Syskit.conf, :remote_process_managers_accept_failed_connections?, false
60+
)
61+
end
62+
63+
it "raises if the server allows connection but does not reply" do
64+
@server = Syskit::ProcessManagers::Remote::Server::Server.new(
65+
@app, port: 0, name_service_ip: "127.0.0.1"
66+
)
67+
@server.open
68+
69+
assert_raises(Syskit::ProcessManagers::Remote::Manager::ComError) do
70+
Syskit::ProcessManagers::Remote::Manager.new(
71+
"localhost", server.port,
72+
root_loader: root_loader,
73+
connection_timeout: 1, response_timeout: 1,
74+
initial_connection_timeout: 1
75+
)
76+
end
77+
end
78+
79+
it "raises if there are no servers" do
80+
assert_raises(Syskit::ProcessManagers::Remote::Manager::ComError) do
81+
Syskit::ProcessManagers::Remote::Manager.new(
82+
"localhost", 12_222,
83+
root_loader: root_loader,
84+
connection_timeout: 1, response_timeout: 1,
85+
initial_connection_timeout: 1
86+
)
87+
end
88+
end
89+
end
90+
91+
describe "when accepting failed connections" do
92+
before do
93+
update_and_restore_attr(
94+
Syskit.conf, :remote_process_managers_accept_failed_connections?, true
95+
)
96+
end
97+
98+
it "handles a server that allows connection but does not reply" do
99+
@server = Syskit::ProcessManagers::Remote::Server::Server.new(
100+
@app, port: 0, name_service_ip: "127.0.0.1"
101+
)
102+
@server.open
103+
104+
client = Syskit::ProcessManagers::Remote::Manager.new(
105+
"localhost", server.port,
106+
root_loader: root_loader,
107+
connection_timeout: 1, response_timeout: 1,
108+
initial_connection_timeout: 1
109+
)
110+
refute client.available?
111+
112+
@server_thread = Thread.new { server.listen }
113+
assert_client_is_eventually_available(client)
114+
end
115+
116+
it "handles a server that shows up late" do
117+
client = Syskit::ProcessManagers::Remote::Manager.new(
118+
"localhost", 12_222,
119+
root_loader: root_loader,
120+
connection_timeout: 1, response_timeout: 1,
121+
initial_connection_timeout: 1
122+
)
123+
refute client.available?
124+
125+
@server = Syskit::ProcessManagers::Remote::Server::Server.new(
126+
@app, port: 0, name_service_ip: "127.0.0.1"
127+
)
128+
@server.open
129+
@server_thread = Thread.new { server.listen }
130+
131+
client.port = @server.port
132+
assert_client_is_eventually_available(client)
133+
end
134+
end
135+
56136
describe "#pid" do
57137
before do
58138
@client = start_and_connect_to_server
@@ -645,4 +725,19 @@ def assert_eventually(timeout: 5, &block)
645725

646726
flunk("#{block} did not return true in #{timeout} seconds")
647727
end
728+
729+
def assert_client_is_eventually_available(client, timeout: 5)
730+
now = Roby.monotonic_time
731+
deadline = now + timeout
732+
while deadline > now
733+
return if client.available?
734+
735+
client.poll
736+
sleep 0.01
737+
738+
now = Roby.monotonic_time
739+
end
740+
741+
flunk("client did not become available in #{timeout} seconds")
742+
end
648743
end

0 commit comments

Comments
 (0)