Skip to content

Commit 27a4fe6

Browse files
authored
Merge pull request #334 from rock-core/capture_output_in_roby_app_helpers
feat: capture output in roby app helpers
2 parents c7b3a08 + 506b01f commit 27a4fe6

File tree

3 files changed

+157
-28
lines changed

3 files changed

+157
-28
lines changed

lib/roby/app.rb

+6-2
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,9 @@ def guess_app_dir
384384
def require_app_dir(needs_current: false, allowed_outside: true)
385385
guess_app_dir
386386
unless app_dir
387-
raise ArgumentError, "your current directory does not seem to be a Roby application directory; did you forget to run 'roby init'?"
387+
raise ArgumentError,
388+
"your current directory (#{Dir.pwd}) does not seem to be a Roby " \
389+
"application directory; did you forget to run 'roby init'?"
388390
end
389391

390392
if needs_current
@@ -400,7 +402,9 @@ def require_app_dir(needs_current: false, allowed_outside: true)
400402
def needs_to_be_in_current_app(allowed_outside: true)
401403
guessed_dir = self.class.guess_app_dir
402404
if guessed_dir && (@app_dir != guessed_dir)
403-
raise NotInCurrentApp, "#{@app_dir} is currently selected, but the current directory is within #{guessed_dir}"
405+
raise NotInCurrentApp,
406+
"#{@app_dir} is currently selected, " \
407+
"but the current directory is within #{guessed_dir}"
404408
elsif !guessed_dir && !allowed_outside
405409
raise NotInCurrentApp, "not currently within an app dir"
406410
end

lib/roby/support.rb

+11-2
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,12 @@ class << self
166166
@enable_deprecation_warnings = true
167167
@deprecation_warnings_are_errors = (ENV["ROBY_ALL_DEPRECATIONS_ARE_ERRORS"] == "1")
168168

169-
def self.warn_deprecated(msg, caller_depth = 1)
169+
def self.warn_deprecated(msg, caller_depth = 5)
170170
if deprecation_warnings_are_errors
171171
error_deprecated(msg, caller_depth)
172172
elsif enable_deprecation_warnings
173173
Roby.warn "Deprecation Warning: #{msg} "\
174-
"at #{caller[1, caller_depth].join("\n")}"
174+
"at #{caller[1, caller_depth].join("\n ")}"
175175
end
176176
end
177177

@@ -207,4 +207,13 @@ def self.which(cmd)
207207

208208
VoidClass = Class.new
209209
Void = VoidClass.new.freeze
210+
211+
# Time in seconds since an arbitrary point in time, unaffected by time corrections
212+
#
213+
# Use this time for e.g. compute timeouts or durations since a certain timepoint
214+
#
215+
# @return [Float]
216+
def self.monotonic_time
217+
Process.clock_gettime(Process::CLOCK_MONOTONIC)
218+
end
210219
end

lib/roby/test/roby_app_helpers.rb

+140-24
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ def teardown
3131
end
3232

3333
def kill_spawned_pids(
34-
pids = @spawned_pids, signal: "INT", next_signal: "KILL", timeout: 5
34+
pids = @spawned_pids.map(&:pid),
35+
signal: "INT", next_signal: "KILL", timeout: 5
3536
)
3637
pending_children = pids.find_all do |pid|
3738
begin
@@ -104,7 +105,13 @@ def assert_roby_app_is_running(
104105
start_time = Time.now
105106
while (Time.now - start_time) < timeout
106107
if ::Process.waitpid(pid, Process::WNOHANG)
107-
flunk "Roby app unexpectedly quit"
108+
if (captured_output = roby_app_join_capture_thread(pid))
109+
flunk "Roby app unexpectedly quit\n" \
110+
"stdout=#{captured_output[:out]}\n" \
111+
"stderr=#{captured_output[:err]}"
112+
else
113+
flunk "Roby app unexpectedly quit"
114+
end
108115
end
109116

110117
begin
@@ -128,6 +135,30 @@ def assert_roby_app_quits(pid, port: Interface::DEFAULT_PORT, interface: nil)
128135
interface&.close if interface_owned
129136
end
130137

138+
# Wait for a subprocess to exit
139+
def assert_process_exits(pid, timeout: 20)
140+
deadline = Time.now + timeout
141+
while Time.now < deadline
142+
_, status = Process.waitpid2(pid, Process::WNOHANG)
143+
if status
144+
roby_app_join_capture_thread(pid)
145+
return status
146+
end
147+
148+
sleep 0.01
149+
end
150+
151+
if (output = roby_app_captured_output(pid))
152+
flunk(
153+
"process #{pid} did not quit within #{timeout} seconds\n" \
154+
"stdout=#{output[:out]}\n" \
155+
"stderr=#{output[:err]}"
156+
)
157+
else
158+
flunk("process #{pid} did not quit within #{timeout} seconds")
159+
end
160+
end
161+
131162
# Wait for the app to exit
132163
#
133164
# Unlike {#assert_roby_app_quits}, this method does not explicitly
@@ -136,14 +167,29 @@ def assert_roby_app_quits(pid, port: Interface::DEFAULT_PORT, interface: nil)
136167
#
137168
# @see assert_roby_app_quits
138169
def assert_roby_app_exits(pid, timeout: 20)
139-
deadline = Time.now + timeout
140-
while Time.now < deadline
141-
_, status = Process.waitpid2(pid, Process::WNOHANG)
142-
return status if status
170+
assert_process_exits(pid, timeout: timeout)
171+
end
143172

144-
sleep 0.01
173+
# Return the output captured so far for the given PID
174+
#
175+
# If the process has stopped and {#roby_app_quit} or {#assert_roby_app_exits}
176+
# was called, the output is complete. Otherwise it might be partial
177+
#
178+
# @return [nil,{out: String, err: String}] nil if the PID does not exist,
179+
# or if roby_app_spawn was not configured to capture the output. Otherwise,
180+
# a hash with the stdout and stderr strings
181+
def roby_app_captured_output(pid)
182+
return unless (spawned = @spawned_pids.find { |p| p.pid == pid })
183+
return unless (queue = spawned.capture_queue)
184+
185+
outputs = spawned.captured_output
186+
until queue.empty?
187+
output, string = queue.pop
188+
outputs[output] << string
145189
end
146-
flunk("app did not quit within #{timeout} seconds")
190+
191+
outputs.transform_values! { |arr| [arr.join] }
192+
outputs.transform_values(&:first)
147193
end
148194

149195
def assert_roby_app_has_job(
@@ -162,8 +208,18 @@ def assert_roby_app_has_job(
162208
flunk "timed out while waiting for action #{action_name} on #{interface}"
163209
end
164210

211+
def roby_app_join_capture_thread(pid)
212+
return unless (spawned = @spawned_pids.find { |p| p.pid == pid })
213+
return unless (thread = spawned.capture_thread)
214+
215+
thread.join
216+
roby_app_captured_output(pid)
217+
end
218+
165219
def roby_app_quit(interface, timeout: 2)
166220
_, status = Process.waitpid2(pid)
221+
roby_app_join_capture_thread(pid)
222+
167223
return if status.success?
168224

169225
raise "roby app with PID #{pid} exited with nonzero status"
@@ -196,42 +252,102 @@ def roby_app_setup_single_script(*scripts)
196252
dir
197253
end
198254

199-
def roby_app_allocate_interface_port
255+
def roby_app_allocate_port
200256
server = TCPServer.new(0)
201257
server.local_address.ip_port
202258
ensure
203259
server&.close
204260
end
205261

262+
def roby_app_allocate_interface_port
263+
roby_app_allocate_port
264+
end
265+
206266
ROBY_PORT_COMMANDS = %w[run].freeze
207267
ROBY_NO_INTERFACE_COMMANDS = %w[wait check test].freeze
208268

209269
def register_roby_plugin(path)
210270
@roby_plugin_path << path
211271
end
212272

273+
SpawnedProcess = Struct.new(
274+
:pid, :capture_thread, :capture_queue, :captured_output,
275+
keyword_init: true
276+
)
277+
278+
# @api private
279+
#
280+
# Start thread that pull data out of a process output pipes
281+
def roby_app_spawn_output_capture_thread(out_r, err_r, queue)
282+
ios = [out_r, err_r]
283+
Thread.new do
284+
until ios.empty?
285+
with_events, = select(ios, [], [])
286+
with_events.each do |io|
287+
unless (data = io.read_nonblock(4096))
288+
raise EOFError
289+
end
290+
291+
queue.push([io == out_r ? :out : :err, data])
292+
rescue EOFError
293+
ios.delete(io)
294+
io.close
295+
rescue IO::WaitReadable
296+
# Wait for more data
297+
end
298+
end
299+
end
300+
end
301+
302+
# @api private
303+
#
304+
# Helper to determine the "right" interface-related arguments in
305+
# {#roby_app_spawn}
306+
def roby_app_spawn_interface_args(command, port)
307+
port ||= roby_app_allocate_port
308+
if ROBY_PORT_COMMANDS.include?(command)
309+
["--interface-versions=#{@roby_app_interface_version}",
310+
"--port-v#{@roby_app_interface_version}", port.to_s]
311+
elsif !ROBY_NO_INTERFACE_COMMANDS.include?(command)
312+
["--interface-version=#{@roby_app_interface_version}",
313+
"--host", "localhost:#{port}"]
314+
end
315+
end
316+
213317
# Spawn the roby app process
214318
#
215319
# @return [Integer] the app PID
216-
def roby_app_spawn(command, *args, port: nil, silent: false, **options)
217-
if silent
320+
def roby_app_spawn( # rubocop:disable Metrics/ParameterLists
321+
command, *args,
322+
port: nil, capture_output: false, silent: false, env: {}, **options
323+
)
324+
if capture_output
325+
out_r, out_w = IO.pipe
326+
err_r, err_w = IO.pipe
327+
capture_queue = Queue.new
328+
capture_thread = roby_app_spawn_output_capture_thread(
329+
out_r, err_r, capture_queue
330+
)
331+
options[:out] = out_w
332+
options[:err] = err_w
333+
elsif silent
218334
options[:out] ||= "/dev/null"
219335
options[:err] ||= "/dev/null"
220336
end
221-
port ||= roby_app_allocate_interface_port
222-
port_args =
223-
if ROBY_PORT_COMMANDS.include?(command)
224-
["--interface-versions=#{@roby_app_interface_version}",
225-
"--port-v#{@roby_app_interface_version}", port.to_s]
226-
elsif !ROBY_NO_INTERFACE_COMMANDS.include?(command)
227-
["--interface-version=#{@roby_app_interface_version}",
228-
"--host", "localhost:#{port}"]
229-
end
337+
338+
port_args = roby_app_spawn_interface_args(command, port)
230339
pid = spawn(
231-
{ "ROBY_PLUGIN_PATH" => @roby_plugin_path.join(":") },
340+
{ "ROBY_PLUGIN_PATH" => @roby_plugin_path.join(":") }.merge(env),
232341
roby_bin, command, *port_args, *args, chdir: app_dir, **options
233342
)
234-
@spawned_pids << pid
343+
out_w&.close
344+
err_w&.close
345+
@spawned_pids << SpawnedProcess.new(
346+
pid: pid,
347+
capture_thread: capture_thread,
348+
capture_queue: capture_queue,
349+
captured_output: { out: [], err: [] }
350+
)
235351
pid
236352
end
237353

@@ -240,14 +356,14 @@ def roby_app_spawn(command, *args, port: nil, silent: false, **options)
240356
# @return [(Integer,Roby::Interface::Client)] the app PID and connected
241357
# roby interface
242358
def roby_app_start(*args, port: nil, silent: false, **options)
243-
port ||= roby_app_allocate_interface_port
359+
port ||= roby_app_allocate_port
244360
pid = roby_app_spawn(*args, port: port, silent: silent, **options)
245361
interface = assert_roby_app_is_running(pid, port: port)
246362
[pid, interface]
247363
end
248364

249365
def register_pid(pid)
250-
@spawned_pids << pid
366+
@spawned_pids << SpawnedProcess.new(pid: pid)
251367
end
252368

253369
def roby_app_run(*args, port: nil, silent: false, **options)

0 commit comments

Comments
 (0)