Skip to content

Commit 0254c79

Browse files
committed
fix(tests): drain runner output before ready marker
1 parent bb206de commit 0254c79

1 file changed

Lines changed: 132 additions & 95 deletions

File tree

ghostscope/tests/common/runner.rs

Lines changed: 132 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ use std::ffi::OsString;
2222
use std::future::Future;
2323
use std::path::{Path, PathBuf};
2424
use tempfile::{Builder, NamedTempFile};
25-
use tokio::io::{AsyncBufReadExt, BufReader};
25+
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
2626
use tokio::process::Command;
27+
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
28+
use tokio::task::JoinHandle;
2729
use tokio::time::{timeout, Duration};
2830

2931
const ENV_GHOSTSCOPE_LOG_LEVEL: &str = "E2E_GHOSTSCOPE_LOG_LEVEL";
@@ -258,7 +260,7 @@ impl GhostscopeRunner {
258260
let stdout_handle = child.stdout.take().unwrap();
259261
let stderr_handle = child.stderr.take().unwrap();
260262
let mut stdout_reader = BufReader::new(stdout_handle);
261-
let mut stderr_reader = BufReader::new(stderr_handle);
263+
let stderr_reader = BufReader::new(stderr_handle);
262264

263265
let mut stdout_content = String::new();
264266
let mut stderr_content = String::new();
@@ -304,58 +306,27 @@ impl GhostscopeRunner {
304306
})?);
305307
}
306308

309+
let (output_tx, mut output_rx) = mpsc::unbounded_channel();
310+
let stdout_task =
311+
spawn_output_reader(stdout_reader, OutputStream::Stdout, output_tx.clone());
312+
let stderr_task = spawn_output_reader(stderr_reader, OutputStream::Stderr, output_tx);
313+
307314
if !ready_fired {
308315
let pre_ready_task = async {
309-
let mut stdout_line = String::new();
310-
let mut stderr_line = String::new();
311316
loop {
312-
stdout_line.clear();
313-
if let Ok(Ok(n)) = timeout(
314-
Duration::from_millis(50),
315-
stdout_reader.read_line(&mut stdout_line),
316-
)
317-
.await
318-
{
319-
if n > 0 {
320-
stdout_content.push_str(&stdout_line);
321-
if ready_marker
322-
.as_deref()
323-
.is_some_and(|marker| stdout_line.trim_end() == marker)
324-
{
325-
if runner_debug {
326-
eprintln!(
327-
"[ghostscope-test-runner] observed ready marker on stdout from {}",
328-
sandbox.label()
329-
);
330-
}
331-
ready_fired = true;
332-
break;
333-
}
334-
}
335-
}
336-
337-
stderr_line.clear();
338-
if let Ok(Ok(n)) = timeout(
339-
Duration::from_millis(50),
340-
stderr_reader.read_line(&mut stderr_line),
341-
)
342-
.await
317+
if let Ok(Some(output_line)) =
318+
timeout(Duration::from_millis(100), output_rx.recv()).await
343319
{
344-
if n > 0 {
345-
stderr_content.push_str(&stderr_line);
346-
if ready_marker
347-
.as_deref()
348-
.is_some_and(|marker| stderr_line.trim_end() == marker)
349-
{
350-
if runner_debug {
351-
eprintln!(
352-
"[ghostscope-test-runner] observed ready marker on stderr from {}",
353-
sandbox.label()
354-
);
355-
}
356-
ready_fired = true;
357-
break;
358-
}
320+
if handle_output_line(
321+
output_line,
322+
&mut stdout_content,
323+
&mut stderr_content,
324+
ready_marker.as_deref(),
325+
runner_debug,
326+
&sandbox,
327+
) {
328+
ready_fired = true;
329+
break;
359330
}
360331
}
361332

@@ -457,31 +428,18 @@ impl GhostscopeRunner {
457428

458429
let timed_out = if ready_fired {
459430
let read_task = async {
460-
let mut stdout_line = String::new();
461-
let mut stderr_line = String::new();
462431
loop {
463-
stdout_line.clear();
464-
if let Ok(Ok(n)) = timeout(
465-
Duration::from_millis(50),
466-
stdout_reader.read_line(&mut stdout_line),
467-
)
468-
.await
469-
{
470-
if n > 0 {
471-
stdout_content.push_str(&stdout_line);
472-
}
473-
}
474-
475-
stderr_line.clear();
476-
if let Ok(Ok(n)) = timeout(
477-
Duration::from_millis(50),
478-
stderr_reader.read_line(&mut stderr_line),
479-
)
480-
.await
432+
if let Ok(Some(output_line)) =
433+
timeout(Duration::from_millis(100), output_rx.recv()).await
481434
{
482-
if n > 0 {
483-
stderr_content.push_str(&stderr_line);
484-
}
435+
handle_output_line(
436+
output_line,
437+
&mut stdout_content,
438+
&mut stderr_content,
439+
ready_marker.as_deref(),
440+
runner_debug,
441+
&sandbox,
442+
);
485443
}
486444

487445
if let Ok(Some(_status)) = child.try_wait() {
@@ -540,28 +498,16 @@ impl GhostscopeRunner {
540498
}
541499
};
542500

543-
{
544-
let mut line = String::new();
545-
loop {
546-
line.clear();
547-
match stdout_reader.read_line(&mut line).await {
548-
Ok(0) => break,
549-
Ok(_) => stdout_content.push_str(&line),
550-
Err(_) => break,
551-
}
552-
}
553-
}
554-
{
555-
let mut line = String::new();
556-
loop {
557-
line.clear();
558-
match stderr_reader.read_line(&mut line).await {
559-
Ok(0) => break,
560-
Ok(_) => stderr_content.push_str(&line),
561-
Err(_) => break,
562-
}
563-
}
564-
}
501+
let _ = stdout_task.await;
502+
let _ = stderr_task.await;
503+
drain_output_channel(
504+
&mut output_rx,
505+
&mut stdout_content,
506+
&mut stderr_content,
507+
ready_marker.as_deref(),
508+
runner_debug,
509+
&sandbox,
510+
);
565511

566512
if exit_code == -1 && (!stdout_content.is_empty() || !stderr_content.is_empty()) {
567513
exit_code = 0;
@@ -655,6 +601,97 @@ struct EffectiveLoggingConfig {
655601
enable_console_logging: bool,
656602
}
657603

604+
#[derive(Clone, Copy)]
605+
enum OutputStream {
606+
Stdout,
607+
Stderr,
608+
}
609+
610+
struct OutputLine {
611+
stream: OutputStream,
612+
line: String,
613+
}
614+
615+
fn spawn_output_reader<R>(
616+
mut reader: BufReader<R>,
617+
stream: OutputStream,
618+
tx: UnboundedSender<OutputLine>,
619+
) -> JoinHandle<()>
620+
where
621+
R: AsyncRead + Unpin + Send + 'static,
622+
{
623+
tokio::spawn(async move {
624+
let mut line = String::new();
625+
loop {
626+
line.clear();
627+
match reader.read_line(&mut line).await {
628+
Ok(0) => break,
629+
Ok(_) => {
630+
if tx
631+
.send(OutputLine {
632+
stream,
633+
line: line.clone(),
634+
})
635+
.is_err()
636+
{
637+
break;
638+
}
639+
}
640+
Err(_) => break,
641+
}
642+
}
643+
})
644+
}
645+
646+
fn handle_output_line(
647+
output_line: OutputLine,
648+
stdout_content: &mut String,
649+
stderr_content: &mut String,
650+
ready_marker: Option<&str>,
651+
runner_debug: bool,
652+
sandbox: &SandboxHandle,
653+
) -> bool {
654+
let target = match output_line.stream {
655+
OutputStream::Stdout => stdout_content,
656+
OutputStream::Stderr => stderr_content,
657+
};
658+
target.push_str(&output_line.line);
659+
660+
let saw_ready = ready_marker.is_some_and(|marker| output_line.line.trim_end() == marker);
661+
if saw_ready && runner_debug {
662+
let stream = match output_line.stream {
663+
OutputStream::Stdout => "stdout",
664+
OutputStream::Stderr => "stderr",
665+
};
666+
eprintln!(
667+
"[ghostscope-test-runner] observed ready marker on {stream} from {}",
668+
sandbox.label()
669+
);
670+
}
671+
672+
saw_ready
673+
}
674+
675+
fn drain_output_channel(
676+
output_rx: &mut UnboundedReceiver<OutputLine>,
677+
stdout_content: &mut String,
678+
stderr_content: &mut String,
679+
ready_marker: Option<&str>,
680+
runner_debug: bool,
681+
sandbox: &SandboxHandle,
682+
) {
683+
while let Ok(output_line) = output_rx.try_recv() {
684+
handle_output_line(
685+
output_line,
686+
stdout_content,
687+
stderr_content,
688+
ready_marker,
689+
runner_debug,
690+
sandbox,
691+
);
692+
}
693+
}
694+
658695
fn create_script_file() -> Result<NamedTempFile> {
659696
let repo_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
660697
.parent()

0 commit comments

Comments
 (0)