Skip to content

Commit 4b95924

Browse files
authored
Merge pull request #504 from rock-core/handle_unavailable_process_managers
feat: support unavailable process managers
2 parents f532785 + dac9655 commit 4b95924

File tree

15 files changed

+305
-51
lines changed

15 files changed

+305
-51
lines changed

lib/syskit/exceptions.rb

+21
Original file line numberDiff line numberDiff line change
@@ -1104,4 +1104,25 @@ def pretty_print(pp)
11041104
@deployment.pretty_print(pp)
11051105
end
11061106
end
1107+
1108+
# Exception raised whenever a task is deployed on a disabled process manager
1109+
class DeployedOnDisabledProcessManager < SpecError
1110+
# The task itself
1111+
attr_reader :task
1112+
1113+
# The deployment task
1114+
attr_reader :deployment_task
1115+
1116+
def initialize(task, deployment_task)
1117+
@task = task
1118+
@deployment_task = deployment_task
1119+
end
1120+
1121+
def pretty_print(pp)
1122+
pp.text "the following task was deployed on "
1123+
pp.text "#{@deployment_task.arguments[:on]}, which is currently disabled"
1124+
pp.breakable
1125+
task.pretty_print(pp)
1126+
end
1127+
end
11071128
end

lib/syskit/network_generation/system_network_deployer.rb

+33-2
Original file line numberDiff line numberDiff line change
@@ -209,14 +209,15 @@ def apply_selected_deployments(selected_deployments, deployment_tasks = {})
209209
def validate_deployed_network(error_handler: RaiseErrorHandler.new)
210210
verify_all_tasks_deployed(error_handler: error_handler)
211211
verify_all_configurations_exist(error_handler: error_handler)
212+
verify_all_process_managers_enabled(error_handler: error_handler)
212213
end
213214

214215
# Verifies that all tasks in the plan are deployed
215216
#
216217
# @param [ResolutionErrorHandler | RaiseErrorHandler] error_handler
217-
def verify_all_tasks_deployed(error_handler)
218+
def verify_all_tasks_deployed(error_handler: RaiseErrorHandler.new)
218219
self.class.verify_all_tasks_deployed(
219-
plan, default_deployment_group, error_handler
220+
plan, default_deployment_group, error_handler: error_handler
220221
)
221222
end
222223

@@ -256,6 +257,36 @@ def self.verify_all_tasks_deployed(
256257
end
257258
end
258259

260+
# Generate errors for all tasks that are using a deployment from a disabled
261+
# process server
262+
#
263+
# @param [#register_resolution_failures_from_exception] error_handler
264+
def verify_all_process_managers_enabled(error_handler: RaiseErrorHandler.new)
265+
self.class.verify_all_process_managers_enabled(
266+
plan, error_handler: error_handler
267+
)
268+
end
269+
270+
# Generate errors for all tasks that are using a deployment from a disabled
271+
# process server
272+
#
273+
# @param [Roby::Plan] plan
274+
# @param [#register_resolution_failures_from_exception] error_handler
275+
def self.verify_all_process_managers_enabled(
276+
plan, error_handler: RaiseErrorHandler.new
277+
)
278+
failed =
279+
plan.find_local_tasks(Deployment)
280+
.find_all { |d| !d.process_server_config.available? }
281+
282+
failed.each do |deployment_task|
283+
deployment_task.each_executed_task do |t|
284+
e = DeployedOnDisabledProcessManager.new(t, deployment_task)
285+
error_handler.register_resolution_failures_from_exception(t, e)
286+
end
287+
end
288+
end
289+
259290
# Verifies that all selected configuration sections exist
260291
#
261292
# @return [Array<ResolutionError>] resolution errors of all the tasks that

lib/syskit/process_managers/remote/manager.rb

+27-5
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def inspect
6565

6666
STATE_CONNECTED = "connected"
6767
STATE_DISCONNECTED = "disconnected"
68+
STATE_CLOSED = "closed"
6869

6970
def available?
7071
@state == STATE_CONNECTED
@@ -85,12 +86,14 @@ def initialize(
8586
Syskit.conf.remote_process_managers_response_timeout,
8687
root_loader: Orocos.default_loader,
8788
register_on_name_server: true,
88-
connect_executor: :io
89+
connect_executor: :io,
90+
create_log_dir: true
8991
)
9092
@host = host
9193
@port = port
9294
@state = STATE_DISCONNECTED
9395
@response_timeout = response_timeout
96+
@create_log_dir = create_log_dir
9497

9598
@processes = {}
9699
@death_queue = []
@@ -135,11 +138,22 @@ def connect
135138

136139
def poll
137140
case @state
141+
when STATE_CLOSED
142+
poll_in_closed_state
138143
when STATE_DISCONNECTED
139144
poll_in_disconnected_state
140145
end
141146
end
142147

148+
def poll_in_closed_state
149+
# Read the output of the future if there is one, and close
150+
# the possibly existing socket
151+
return unless (result = @connect_future&.result(0))
152+
153+
result[1]&.close
154+
@connect_future = nil
155+
end
156+
143157
def poll_in_disconnected_state
144158
if @connect_future
145159
return unless (result = @connect_future.result(0))
@@ -174,14 +188,22 @@ def handle_new_connection(socket)
174188

175189
@server_pid = pid
176190
@loader = Loader.new(self, @root_loader)
191+
192+
if @create_log_dir
193+
create_log_dir(
194+
Roby.app.time_tag, { "parent" => Roby.app.app_metadata }
195+
)
196+
end
197+
kill_all if Syskit.conf.kill_all_on_process_server_connection?
198+
177199
ProcessManagers.info "connected to remote process manager #{self}"
178200
rescue StandardError => e
179201
ProcessManagers.warn(
180202
"got a socket to remote process manager #{self}, but the first " \
181203
"call failed: #{e.message}"
182204
)
183205

184-
close
206+
close(state: STATE_DISCONNECTED)
185207
schedule_connection_attempt
186208
end
187209

@@ -392,9 +414,9 @@ def disconnect
392414
close
393415
end
394416

395-
def close
396-
@state = STATE_DISCONNECTED
397-
@socket.close
417+
def close(state: STATE_CLOSED)
418+
@state = state
419+
@socket&.close
398420
end
399421

400422
def write_command(cmd, args = nil)

lib/syskit/process_managers/remote/server/process.rb

+1-2
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ def spawn_setup_forked_process_and_exec(ior_write_fd, control_read_fd)
360360
ENV["ORO_LOGFILE"] = resolve_orocos_logger_output(pid)
361361

362362
::Process.setpgrp
363-
debug "command line: #{@command} #{arguments.join(' ')}"
363+
info "starting: #{@command} #{arguments.join(' ')}"
364364
exec(@command, *arguments,
365365
control_read_fd => control_read_fd,
366366
ior_write_fd => ior_write_fd,
@@ -423,7 +423,6 @@ def kill(hard: false)
423423
rescue Errno::ESRCH # rubocop:disable Lint/SuppressedException
424424
end
425425
else
426-
puts "control FD"
427426
@control_write_fd.write("Q")
428427
end
429428
end

lib/syskit/roby_app/configuration.rb

+8-5
Original file line numberDiff line numberDiff line change
@@ -759,10 +759,6 @@ def register_remote_manager(
759759
client = ProcessManagers::Remote::Manager.new(
760760
host, port, root_loader: app.default_loader
761761
)
762-
client.create_log_dir(
763-
Roby.app.time_tag, { "parent" => Roby.app.app_metadata }
764-
)
765-
client.kill_all if kill_all_on_process_server_connection?
766762
config = register_process_server(
767763
name, client,
768764
host_id: host_id || name,
@@ -775,12 +771,19 @@ def register_remote_manager(
775771

776772
ProcessServerConfig =
777773
Struct.new :name, :client, :log_dir, :host_id, :supports_log_transfer,
778-
:logging_enabled, :register_on_name_server,
774+
:logging_enabled, :register_on_name_server, :disabled,
779775
keyword_init: true do
780776
def manager
781777
client
782778
end
783779

780+
def available?
781+
return false if disabled
782+
return client.available? if client.respond_to?(:available?)
783+
784+
true
785+
end
786+
784787
def on_localhost?
785788
host_id == "localhost" || host_id == "syskit"
786789
end

lib/syskit/roby_app/plugin.rb

+3-22
Original file line numberDiff line numberDiff line change
@@ -718,28 +718,9 @@ def self.create_local_process_server_client(app)
718718
"no process server is being started"
719719
end
720720

721-
# Wait for the server to be ready
722-
client = nil
723-
until client
724-
client =
725-
begin ProcessManagers::Remote::Manager.new("localhost", @server_port)
726-
rescue Errno::ECONNREFUSED
727-
sleep 0.1
728-
is_running =
729-
begin
730-
!::Process.waitpid(@server_pid, ::Process::WNOHANG)
731-
rescue Errno::ESRCH
732-
false
733-
end
734-
735-
unless is_running
736-
raise ProcessManagers::Remote::Manager::StartupFailed,
737-
"the local process server failed to start"
738-
end
739-
740-
nil
741-
end
742-
end
721+
client = ProcessManagers::Remote::Manager.new(
722+
"localhost", @server_port, create_log_dir: false
723+
)
743724

744725
# Verify that the server is actually ours (i.e. check that there
745726
# was not one that was still running)

lib/syskit/runtime/update_deployment_states.rb

+12
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,24 @@ def self.update_deployment_states(plan)
1010
# #cleanup_dead_connections, thus avoiding to disconnect connections
1111
# between already-dead processes
1212

13+
poll_managers
1314
handle_dead_deployments(plan)
1415
trigger_ready_deployments(plan)
1516
end
1617

18+
def self.poll_managers
19+
server_config = Syskit.conf.each_process_server_config.to_a
20+
server_config.each do |config|
21+
config.client.poll if config.client.respond_to?(:poll)
22+
end
23+
end
24+
1725
def self.handle_dead_deployments(plan)
1826
all_dead_deployments = Set.new
1927
server_config = Syskit.conf.each_process_server_config.to_a
2028
server_config.each do |config|
29+
next unless config.available?
30+
2131
begin
2232
dead_deployments = config.client.wait_termination
2333
rescue ::Exception => e
@@ -42,6 +52,8 @@ def self.trigger_ready_deployments(plan)
4252
not_ready_deployments = find_all_not_ready_deployments(plan)
4353
not_ready_deployments.each do |process_server_name, deployments|
4454
server_config = Syskit.conf.process_server_config_for(process_server_name)
55+
next unless server_config.available?
56+
4557
wait_result = server_config.client.wait_running(
4658
*deployments.map { |d| d.arguments[:process_name] }
4759
)

lib/syskit/scripts/process_server.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
require "optparse"
77

8+
server_port = Syskit::ProcessManagers::Remote::DEFAULT_PORT
9+
810
options = Hash[host: "localhost"]
911
parser = OptionParser.new do |opt|
1012
opt.on "--fd=FD", Integer, "the socket that should be used as TCP server" do |fd|
@@ -13,12 +15,14 @@
1315
opt.on "--log-dir=DIR", String, "the directory that should be used for logs" do |dir|
1416
Roby.app.log_dir = dir
1517
end
18+
opt.on "--port=PORT", Integer, "the port to listen on" do |port|
19+
server_port = port
20+
end
1621
opt.on("--debug", "turn on debug mode") do
1722
Syskit::ProcessManagers::Remote::Server.logger.level = Logger::DEBUG
1823
end
1924
end
2025

21-
server_port = Syskit::ProcessManagers::Remote::DEFAULT_PORT
2226
Roby::Application.host_options(parser, options)
2327
parser.parse(ARGV)
2428

lib/syskit/test/network_manipulation.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -351,14 +351,14 @@ def syskit_stub_task_context_model(name, &block)
351351
# @param [Boolean] register the configured deployment in the test group
352352
# This makes it available to further deployments
353353
# @return [Syskit::Models::ConfiguredDeployment]
354-
def syskit_stub_configured_deployment(
354+
def syskit_stub_configured_deployment( # rubocop:disable Metrics/ParameterLists
355355
task_model = nil, task_name = syskit_default_stub_name(task_model),
356356
remote_task: syskit_stub_resolves_remote_tasks?,
357-
register: true, read_only: [], &block
357+
on: "stubs", register: true, read_only: [], &block
358358
)
359359
configured_deployment = @__stubs.stub_configured_deployment(
360360
task_model, task_name,
361-
read_only: read_only, remote_task: remote_task, &block
361+
on: on, read_only: read_only, remote_task: remote_task, &block
362362
)
363363
if register
364364
@__test_deployment_group

lib/syskit/test/stubs.rb

+5-4
Original file line numberDiff line numberDiff line change
@@ -285,13 +285,14 @@ def handle_abstract_component_model_in_requirements(model)
285285
# @api private
286286
#
287287
# Computes a configured deployment suitable to deploy the given task model
288-
def stub_configured_deployment(
288+
def stub_configured_deployment( # rubocop:disable Metrics/ParameterLists
289289
task_model = nil, task_name = default_stub_name,
290-
remote_task: false, read_only: [], logger_name: nil, &block
290+
remote_task: false, read_only: [], logger_name: nil,
291+
on: "stubs", &block
291292
)
292293
deployment_model = stub_deployment_model(task_model, task_name, &block)
293294

294-
process_server = Syskit.conf.process_server_for("stubs")
295+
process_server = Syskit.conf.process_server_for(on)
295296
task_context_class =
296297
if remote_task
297298
Orocos::RubyTasks::RemoteTaskContext
@@ -300,7 +301,7 @@ def stub_configured_deployment(
300301
end
301302

302303
Models::ConfiguredDeployment.new(
303-
"stubs", deployment_model, { task_name => task_name }, task_name,
304+
on, deployment_model, { task_name => task_name }, task_name,
304305
Hash[task_context_class: task_context_class],
305306
read_only: read_only, logger_name: logger_name
306307
)

test/network_generation/test_system_network_deployer.rb

+31-1
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ def make_candidates(count)
570570
deployment_m.define_deployed_task("task", task_m)
571571
plan.add(
572572
deployment = deployment_m.new(
573-
name_mappings: { "task" => "task" }
573+
on: "localhost", name_mappings: { "task" => "task" }
574574
)
575575
)
576576
plan.add(task = deployment.task("task"))
@@ -581,6 +581,36 @@ def make_candidates(count)
581581
end
582582
end
583583
end
584+
585+
describe "validation that all process managers are enabled" do
586+
it "rejects tasks that are using a deployment " \
587+
"from a disabled manager" do
588+
mng = register_ruby_tasks_manager("disabled_manager")
589+
mng.disabled = true
590+
591+
task_m = Syskit::TaskContext.new_submodel
592+
deployment = syskit_stub_configured_deployment(
593+
task_m, on: "disabled_manager"
594+
)
595+
ir = task_m.to_instance_requirements
596+
ir.use_configured_deployment(deployment)
597+
task = ir.instanciate(plan)
598+
599+
error_handler = ResolutionErrorHandler.new(
600+
deployer.plan, deployer.merge_solver
601+
)
602+
execute do
603+
deployer.deploy(error_handler: error_handler)
604+
end
605+
606+
assert_equal 1, error_handler.resolution_failures.size
607+
e = error_handler.resolution_failures.first
608+
assert_kind_of DeployedOnDisabledProcessManager,
609+
e.original_exception
610+
assert_equal deployer.merge_solver.replacement_for(task),
611+
e.original_exception.task
612+
end
613+
end
584614
end
585615

586616
def deployed_task_helper(model, name)

0 commit comments

Comments
 (0)