Skip to content

feat: capture output in roby app helpers #334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/roby/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,9 @@ def guess_app_dir
def require_app_dir(needs_current: false, allowed_outside: true)
guess_app_dir
unless app_dir
raise ArgumentError, "your current directory does not seem to be a Roby application directory; did you forget to run 'roby init'?"
raise ArgumentError,
"your current directory (#{Dir.pwd}) does not seem to be a Roby " \
"application directory; did you forget to run 'roby init'?"
end

if needs_current
Expand All @@ -400,7 +402,9 @@ def require_app_dir(needs_current: false, allowed_outside: true)
def needs_to_be_in_current_app(allowed_outside: true)
guessed_dir = self.class.guess_app_dir
if guessed_dir && (@app_dir != guessed_dir)
raise NotInCurrentApp, "#{@app_dir} is currently selected, but the current directory is within #{guessed_dir}"
raise NotInCurrentApp,
"#{@app_dir} is currently selected, " \
"but the current directory is within #{guessed_dir}"
elsif !guessed_dir && !allowed_outside
raise NotInCurrentApp, "not currently within an app dir"
end
Expand Down
13 changes: 11 additions & 2 deletions lib/roby/support.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ class << self
@enable_deprecation_warnings = true
@deprecation_warnings_are_errors = (ENV["ROBY_ALL_DEPRECATIONS_ARE_ERRORS"] == "1")

def self.warn_deprecated(msg, caller_depth = 1)
def self.warn_deprecated(msg, caller_depth = 5)
if deprecation_warnings_are_errors
error_deprecated(msg, caller_depth)
elsif enable_deprecation_warnings
Roby.warn "Deprecation Warning: #{msg} "\
"at #{caller[1, caller_depth].join("\n")}"
"at #{caller[1, caller_depth].join("\n ")}"
end
end

Expand Down Expand Up @@ -207,4 +207,13 @@ def self.which(cmd)

VoidClass = Class.new
Void = VoidClass.new.freeze

# Time in seconds since an arbitrary point in time, unaffected by time corrections
#
# Use this time for e.g. compute timeouts or durations since a certain timepoint
#
# @return [Float]
def self.monotonic_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
164 changes: 140 additions & 24 deletions lib/roby/test/roby_app_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def teardown
end

def kill_spawned_pids(
pids = @spawned_pids, signal: "INT", next_signal: "KILL", timeout: 5
pids = @spawned_pids.map(&:pid),
signal: "INT", next_signal: "KILL", timeout: 5
)
pending_children = pids.find_all do |pid|
begin
Expand Down Expand Up @@ -104,7 +105,13 @@ def assert_roby_app_is_running(
start_time = Time.now
while (Time.now - start_time) < timeout
if ::Process.waitpid(pid, Process::WNOHANG)
flunk "Roby app unexpectedly quit"
if (captured_output = roby_app_join_capture_thread(pid))
flunk "Roby app unexpectedly quit\n" \
"stdout=#{captured_output[:out]}\n" \
"stderr=#{captured_output[:err]}"
else
flunk "Roby app unexpectedly quit"
end
end

begin
Expand All @@ -128,6 +135,30 @@ def assert_roby_app_quits(pid, port: Interface::DEFAULT_PORT, interface: nil)
interface&.close if interface_owned
end

# Wait for a subprocess to exit
def assert_process_exits(pid, timeout: 20)
deadline = Time.now + timeout
while Time.now < deadline
_, status = Process.waitpid2(pid, Process::WNOHANG)
if status
roby_app_join_capture_thread(pid)
return status
end

sleep 0.01
end

if (output = roby_app_captured_output(pid))
flunk(
"process #{pid} did not quit within #{timeout} seconds\n" \
"stdout=#{output[:out]}\n" \
"stderr=#{output[:err]}"
)
else
flunk("process #{pid} did not quit within #{timeout} seconds")
end
end

# Wait for the app to exit
#
# Unlike {#assert_roby_app_quits}, this method does not explicitly
Expand All @@ -136,14 +167,29 @@ def assert_roby_app_quits(pid, port: Interface::DEFAULT_PORT, interface: nil)
#
# @see assert_roby_app_quits
def assert_roby_app_exits(pid, timeout: 20)
deadline = Time.now + timeout
while Time.now < deadline
_, status = Process.waitpid2(pid, Process::WNOHANG)
return status if status
assert_process_exits(pid, timeout: timeout)
end

sleep 0.01
# Return the output captured so far for the given PID
#
# If the process has stopped and {#roby_app_quit} or {#assert_roby_app_exits}
# was called, the output is complete. Otherwise it might be partial
#
# @return [nil,{out: String, err: String}] nil if the PID does not exist,
# or if roby_app_spawn was not configured to capture the output. Otherwise,
# a hash with the stdout and stderr strings
def roby_app_captured_output(pid)
return unless (spawned = @spawned_pids.find { |p| p.pid == pid })
return unless (queue = spawned.capture_queue)

outputs = spawned.captured_output
until queue.empty?
output, string = queue.pop
outputs[output] << string
end
flunk("app did not quit within #{timeout} seconds")

outputs.transform_values! { |arr| [arr.join] }
outputs.transform_values(&:first)
end

def assert_roby_app_has_job(
Expand All @@ -162,8 +208,18 @@ def assert_roby_app_has_job(
flunk "timed out while waiting for action #{action_name} on #{interface}"
end

def roby_app_join_capture_thread(pid)
return unless (spawned = @spawned_pids.find { |p| p.pid == pid })
return unless (thread = spawned.capture_thread)

thread.join
roby_app_captured_output(pid)
end

def roby_app_quit(interface, timeout: 2)
_, status = Process.waitpid2(pid)
roby_app_join_capture_thread(pid)

return if status.success?

raise "roby app with PID #{pid} exited with nonzero status"
Expand Down Expand Up @@ -196,42 +252,102 @@ def roby_app_setup_single_script(*scripts)
dir
end

def roby_app_allocate_interface_port
def roby_app_allocate_port
server = TCPServer.new(0)
server.local_address.ip_port
ensure
server&.close
end

def roby_app_allocate_interface_port
roby_app_allocate_port
end

ROBY_PORT_COMMANDS = %w[run].freeze
ROBY_NO_INTERFACE_COMMANDS = %w[wait check test].freeze

def register_roby_plugin(path)
@roby_plugin_path << path
end

SpawnedProcess = Struct.new(
:pid, :capture_thread, :capture_queue, :captured_output,
keyword_init: true
)

# @api private
#
# Start thread that pull data out of a process output pipes
def roby_app_spawn_output_capture_thread(out_r, err_r, queue)
ios = [out_r, err_r]
Thread.new do
until ios.empty?
with_events, = select(ios, [], [])
with_events.each do |io|
unless (data = io.read_nonblock(4096))
raise EOFError
end

queue.push([io == out_r ? :out : :err, data])
rescue EOFError
ios.delete(io)
io.close
rescue IO::WaitReadable
# Wait for more data
end
end
end
end

# @api private
#
# Helper to determine the "right" interface-related arguments in
# {#roby_app_spawn}
def roby_app_spawn_interface_args(command, port)
port ||= roby_app_allocate_port
if ROBY_PORT_COMMANDS.include?(command)
["--interface-versions=#{@roby_app_interface_version}",
"--port-v#{@roby_app_interface_version}", port.to_s]
elsif !ROBY_NO_INTERFACE_COMMANDS.include?(command)
["--interface-version=#{@roby_app_interface_version}",
"--host", "localhost:#{port}"]
end
end

# Spawn the roby app process
#
# @return [Integer] the app PID
def roby_app_spawn(command, *args, port: nil, silent: false, **options)
if silent
def roby_app_spawn( # rubocop:disable Metrics/ParameterLists
command, *args,
port: nil, capture_output: false, silent: false, env: {}, **options
)
if capture_output
out_r, out_w = IO.pipe
err_r, err_w = IO.pipe
capture_queue = Queue.new
capture_thread = roby_app_spawn_output_capture_thread(
out_r, err_r, capture_queue
)
options[:out] = out_w
options[:err] = err_w
elsif silent
options[:out] ||= "/dev/null"
options[:err] ||= "/dev/null"
end
port ||= roby_app_allocate_interface_port
port_args =
if ROBY_PORT_COMMANDS.include?(command)
["--interface-versions=#{@roby_app_interface_version}",
"--port-v#{@roby_app_interface_version}", port.to_s]
elsif !ROBY_NO_INTERFACE_COMMANDS.include?(command)
["--interface-version=#{@roby_app_interface_version}",
"--host", "localhost:#{port}"]
end

port_args = roby_app_spawn_interface_args(command, port)
pid = spawn(
{ "ROBY_PLUGIN_PATH" => @roby_plugin_path.join(":") },
{ "ROBY_PLUGIN_PATH" => @roby_plugin_path.join(":") }.merge(env),
roby_bin, command, *port_args, *args, chdir: app_dir, **options
)
@spawned_pids << pid
out_w&.close
err_w&.close
@spawned_pids << SpawnedProcess.new(
pid: pid,
capture_thread: capture_thread,
capture_queue: capture_queue,
captured_output: { out: [], err: [] }
)
pid
end

Expand All @@ -240,14 +356,14 @@ def roby_app_spawn(command, *args, port: nil, silent: false, **options)
# @return [(Integer,Roby::Interface::Client)] the app PID and connected
# roby interface
def roby_app_start(*args, port: nil, silent: false, **options)
port ||= roby_app_allocate_interface_port
port ||= roby_app_allocate_port
pid = roby_app_spawn(*args, port: port, silent: silent, **options)
interface = assert_roby_app_is_running(pid, port: port)
[pid, interface]
end

def register_pid(pid)
@spawned_pids << pid
@spawned_pids << SpawnedProcess.new(pid: pid)
end

def roby_app_run(*args, port: nil, silent: false, **options)
Expand Down