Skip to content

feat: support unavailable process managers #504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 29, 2025
21 changes: 21 additions & 0 deletions lib/syskit/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1104,4 +1104,25 @@ def pretty_print(pp)
@deployment.pretty_print(pp)
end
end

# Exception raised whenever a task is deployed on a disabled process manager
class DeployedOnDisabledProcessManager < SpecError
# The task itself
attr_reader :task

# The deployment task
attr_reader :deployment_task

def initialize(task, deployment_task)
@task = task
@deployment_task = deployment_task
end

def pretty_print(pp)
pp.text "the following task was deployed on "
pp.text "#{@deployment_task.arguments[:on]}, which is currently disabled"
pp.breakable
task.pretty_print(pp)
end
end
end
35 changes: 33 additions & 2 deletions lib/syskit/network_generation/system_network_deployer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,15 @@ def apply_selected_deployments(selected_deployments, deployment_tasks = {})
def validate_deployed_network(error_handler: RaiseErrorHandler.new)
verify_all_tasks_deployed(error_handler: error_handler)
verify_all_configurations_exist(error_handler: error_handler)
verify_all_process_managers_enabled(error_handler: error_handler)
end

# Verifies that all tasks in the plan are deployed
#
# @param [ResolutionErrorHandler | RaiseErrorHandler] error_handler
def verify_all_tasks_deployed(error_handler)
def verify_all_tasks_deployed(error_handler: RaiseErrorHandler.new)
self.class.verify_all_tasks_deployed(
plan, default_deployment_group, error_handler
plan, default_deployment_group, error_handler: error_handler
)
end

Expand Down Expand Up @@ -256,6 +257,36 @@ def self.verify_all_tasks_deployed(
end
end

# Generate errors for all tasks that are using a deployment from a disabled
# process server
#
# @param [#register_resolution_failures_from_exception] error_handler
def verify_all_process_managers_enabled(error_handler: RaiseErrorHandler.new)
self.class.verify_all_process_managers_enabled(
plan, error_handler: error_handler
)
end

# Generate errors for all tasks that are using a deployment from a disabled
# process server
#
# @param [Roby::Plan] plan
# @param [#register_resolution_failures_from_exception] error_handler
def self.verify_all_process_managers_enabled(
plan, error_handler: RaiseErrorHandler.new
)
failed =
plan.find_local_tasks(Deployment)
.find_all { |d| !d.process_server_config.available? }

failed.each do |deployment_task|
deployment_task.each_executed_task do |t|
e = DeployedOnDisabledProcessManager.new(t, deployment_task)
error_handler.register_resolution_failures_from_exception(t, e)
end
end
end

# Verifies that all selected configuration sections exist
#
# @return [Array<ResolutionError>] resolution errors of all the tasks that
Expand Down
32 changes: 27 additions & 5 deletions lib/syskit/process_managers/remote/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def inspect

STATE_CONNECTED = "connected"
STATE_DISCONNECTED = "disconnected"
STATE_CLOSED = "closed"

def available?
@state == STATE_CONNECTED
Expand All @@ -85,12 +86,14 @@ def initialize(
Syskit.conf.remote_process_managers_response_timeout,
root_loader: Orocos.default_loader,
register_on_name_server: true,
connect_executor: :io
connect_executor: :io,
create_log_dir: true
)
@host = host
@port = port
@state = STATE_DISCONNECTED
@response_timeout = response_timeout
@create_log_dir = create_log_dir

@processes = {}
@death_queue = []
Expand Down Expand Up @@ -135,11 +138,22 @@ def connect

def poll
case @state
when STATE_CLOSED
poll_in_closed_state
when STATE_DISCONNECTED
poll_in_disconnected_state
end
end

def poll_in_closed_state
# Read the output of the future if there is one, and close
# the possibly existing socket
return unless (result = @connect_future&.result(0))

result[1]&.close
@connect_future = nil
end

def poll_in_disconnected_state
if @connect_future
return unless (result = @connect_future.result(0))
Expand Down Expand Up @@ -174,14 +188,22 @@ def handle_new_connection(socket)

@server_pid = pid
@loader = Loader.new(self, @root_loader)

if @create_log_dir
create_log_dir(
Roby.app.time_tag, { "parent" => Roby.app.app_metadata }
)
end
kill_all if Syskit.conf.kill_all_on_process_server_connection?

ProcessManagers.info "connected to remote process manager #{self}"
rescue StandardError => e
ProcessManagers.warn(
"got a socket to remote process manager #{self}, but the first " \
"call failed: #{e.message}"
)

close
close(state: STATE_DISCONNECTED)
schedule_connection_attempt
end

Expand Down Expand Up @@ -392,9 +414,9 @@ def disconnect
close
end

def close
@state = STATE_DISCONNECTED
@socket.close
def close(state: STATE_CLOSED)
@state = state
@socket&.close
end

def write_command(cmd, args = nil)
Expand Down
3 changes: 1 addition & 2 deletions lib/syskit/process_managers/remote/server/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def spawn_setup_forked_process_and_exec(ior_write_fd, control_read_fd)
ENV["ORO_LOGFILE"] = resolve_orocos_logger_output(pid)

::Process.setpgrp
debug "command line: #{@command} #{arguments.join(' ')}"
info "starting: #{@command} #{arguments.join(' ')}"
exec(@command, *arguments,
control_read_fd => control_read_fd,
ior_write_fd => ior_write_fd,
Expand Down Expand Up @@ -423,7 +423,6 @@ def kill(hard: false)
rescue Errno::ESRCH # rubocop:disable Lint/SuppressedException
end
else
puts "control FD"
@control_write_fd.write("Q")
end
end
Expand Down
13 changes: 8 additions & 5 deletions lib/syskit/roby_app/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -759,10 +759,6 @@ def register_remote_manager(
client = ProcessManagers::Remote::Manager.new(
host, port, root_loader: app.default_loader
)
client.create_log_dir(
Roby.app.time_tag, { "parent" => Roby.app.app_metadata }
)
client.kill_all if kill_all_on_process_server_connection?
config = register_process_server(
name, client,
host_id: host_id || name,
Expand All @@ -775,12 +771,19 @@ def register_remote_manager(

ProcessServerConfig =
Struct.new :name, :client, :log_dir, :host_id, :supports_log_transfer,
:logging_enabled, :register_on_name_server,
:logging_enabled, :register_on_name_server, :disabled,
keyword_init: true do
def manager
client
end

def available?
return false if disabled
return client.available? if client.respond_to?(:available?)

true
end

def on_localhost?
host_id == "localhost" || host_id == "syskit"
end
Expand Down
25 changes: 3 additions & 22 deletions lib/syskit/roby_app/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -718,28 +718,9 @@ def self.create_local_process_server_client(app)
"no process server is being started"
end

# Wait for the server to be ready
client = nil
until client
client =
begin ProcessManagers::Remote::Manager.new("localhost", @server_port)
rescue Errno::ECONNREFUSED
sleep 0.1
is_running =
begin
!::Process.waitpid(@server_pid, ::Process::WNOHANG)
rescue Errno::ESRCH
false
end

unless is_running
raise ProcessManagers::Remote::Manager::StartupFailed,
"the local process server failed to start"
end

nil
end
end
client = ProcessManagers::Remote::Manager.new(
"localhost", @server_port, create_log_dir: false
)

# Verify that the server is actually ours (i.e. check that there
# was not one that was still running)
Expand Down
12 changes: 12 additions & 0 deletions lib/syskit/runtime/update_deployment_states.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,24 @@ def self.update_deployment_states(plan)
# #cleanup_dead_connections, thus avoiding to disconnect connections
# between already-dead processes

poll_managers
handle_dead_deployments(plan)
trigger_ready_deployments(plan)
end

def self.poll_managers
server_config = Syskit.conf.each_process_server_config.to_a
server_config.each do |config|
config.client.poll if config.client.respond_to?(:poll)
end
end

def self.handle_dead_deployments(plan)
all_dead_deployments = Set.new
server_config = Syskit.conf.each_process_server_config.to_a
server_config.each do |config|
next unless config.available?

begin
dead_deployments = config.client.wait_termination
rescue ::Exception => e
Expand All @@ -42,6 +52,8 @@ def self.trigger_ready_deployments(plan)
not_ready_deployments = find_all_not_ready_deployments(plan)
not_ready_deployments.each do |process_server_name, deployments|
server_config = Syskit.conf.process_server_config_for(process_server_name)
next unless server_config.available?

wait_result = server_config.client.wait_running(
*deployments.map { |d| d.arguments[:process_name] }
)
Expand Down
6 changes: 5 additions & 1 deletion lib/syskit/scripts/process_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

require "optparse"

server_port = Syskit::ProcessManagers::Remote::DEFAULT_PORT

options = Hash[host: "localhost"]
parser = OptionParser.new do |opt|
opt.on "--fd=FD", Integer, "the socket that should be used as TCP server" do |fd|
Expand All @@ -13,12 +15,14 @@
opt.on "--log-dir=DIR", String, "the directory that should be used for logs" do |dir|
Roby.app.log_dir = dir
end
opt.on "--port=PORT", Integer, "the port to listen on" do |port|
server_port = port
end
opt.on("--debug", "turn on debug mode") do
Syskit::ProcessManagers::Remote::Server.logger.level = Logger::DEBUG
end
end

server_port = Syskit::ProcessManagers::Remote::DEFAULT_PORT
Roby::Application.host_options(parser, options)
parser.parse(ARGV)

Expand Down
6 changes: 3 additions & 3 deletions lib/syskit/test/network_manipulation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,14 @@ def syskit_stub_task_context_model(name, &block)
# @param [Boolean] register the configured deployment in the test group
# This makes it available to further deployments
# @return [Syskit::Models::ConfiguredDeployment]
def syskit_stub_configured_deployment(
def syskit_stub_configured_deployment( # rubocop:disable Metrics/ParameterLists
task_model = nil, task_name = syskit_default_stub_name(task_model),
remote_task: syskit_stub_resolves_remote_tasks?,
register: true, read_only: [], &block
on: "stubs", register: true, read_only: [], &block
)
configured_deployment = @__stubs.stub_configured_deployment(
task_model, task_name,
read_only: read_only, remote_task: remote_task, &block
on: on, read_only: read_only, remote_task: remote_task, &block
)
if register
@__test_deployment_group
Expand Down
9 changes: 5 additions & 4 deletions lib/syskit/test/stubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,14 @@ def handle_abstract_component_model_in_requirements(model)
# @api private
#
# Computes a configured deployment suitable to deploy the given task model
def stub_configured_deployment(
def stub_configured_deployment( # rubocop:disable Metrics/ParameterLists
task_model = nil, task_name = default_stub_name,
remote_task: false, read_only: [], logger_name: nil, &block
remote_task: false, read_only: [], logger_name: nil,
on: "stubs", &block
)
deployment_model = stub_deployment_model(task_model, task_name, &block)

process_server = Syskit.conf.process_server_for("stubs")
process_server = Syskit.conf.process_server_for(on)
task_context_class =
if remote_task
Orocos::RubyTasks::RemoteTaskContext
Expand All @@ -300,7 +301,7 @@ def stub_configured_deployment(
end

Models::ConfiguredDeployment.new(
"stubs", deployment_model, { task_name => task_name }, task_name,
on, deployment_model, { task_name => task_name }, task_name,
Hash[task_context_class: task_context_class],
read_only: read_only, logger_name: logger_name
)
Expand Down
32 changes: 31 additions & 1 deletion test/network_generation/test_system_network_deployer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ def make_candidates(count)
deployment_m.orogen_model.task "task", task_m.orogen_model
plan.add(
deployment = deployment_m.new(
name_mappings: { "task" => "task" }
on: "localhost", name_mappings: { "task" => "task" }
)
)
plan.add(task = deployment.task("task"))
Expand All @@ -580,6 +580,36 @@ def make_candidates(count)
end
end
end

describe "validation that all process managers are enabled" do
it "rejects tasks that are using a deployment " \
"from a disabled manager" do
mng = register_ruby_tasks_manager("disabled_manager")
mng.disabled = true

task_m = Syskit::TaskContext.new_submodel
deployment = syskit_stub_configured_deployment(
task_m, on: "disabled_manager"
)
ir = task_m.to_instance_requirements
ir.use_configured_deployment(deployment)
task = ir.instanciate(plan)

error_handler = ResolutionErrorHandler.new(
deployer.plan, deployer.merge_solver
)
execute do
deployer.deploy(error_handler: error_handler)
end

assert_equal 1, error_handler.resolution_failures.size
e = error_handler.resolution_failures.first
assert_kind_of DeployedOnDisabledProcessManager,
e.original_exception
assert_equal deployer.merge_solver.replacement_for(task),
e.original_exception.task
end
end
end

def deployed_task_helper(model, name)
Expand Down
Loading