Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/communication/handlers/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ impl StartupHandler {
Ok(streams) => streams,
Err(why) => {
window.write_to_command_line(&why.to_string())?;
build_streams_from_input(&[command.to_owned()], false).unwrap()
match build_streams_from_input(&[command.to_owned()], false) {
Ok(streams) => streams,
Err(why) => {
window.write_to_command_line(&why.to_string())?;
return Ok(());
}
}
}
};
window.config.stream_type = StdErr;
Expand Down Expand Up @@ -204,7 +210,7 @@ mod startup_tests {
}

#[test]
fn doesnt_crash_alpha() {
fn doesnt_crash_invalid_command_startup() {
// Setup dummy window
let mut window = MainWindow::_new_dummy();
window.config.stream_type = StreamType::Auxiliary;
Expand All @@ -219,8 +225,8 @@ mod startup_tests {
.process_command(&mut window, "zzzfake_file_name")
.is_ok()
);
assert!(matches!(window.input_type, InputType::Normal));
assert!(matches!(window.config.stream_type, StreamType::StdErr));
assert!(matches!(window.input_type, InputType::Startup));
assert!(matches!(window.config.stream_type, StreamType::Auxiliary));
Session::del(&[Session::list_full().len() - 1]).unwrap();
}
}
88 changes: 48 additions & 40 deletions src/communication/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use std::{
fs::File,
io::{BufRead, BufReader},
path::Path,
process::{Command, Stdio},
process::{Child, Command, Stdio},
result::Result,
sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, channel},
},
thread, time,
Expand All @@ -31,8 +32,10 @@ use std::{
pub struct InputStream {
pub stdout: Receiver<String>,
pub stderr: Receiver<String>,
pub should_die: Arc<Mutex<bool>>,
pub should_die: Arc<AtomicBool>,
pub _type: String,
pub handle: Option<thread::JoinHandle<()>>,
pub child: Option<Child>,
}

pub trait Input {
Expand Down Expand Up @@ -86,8 +89,10 @@ impl Input for FileInput {
Ok(InputStream {
stdout: out_rx,
stderr: err_rx,
should_die: Arc::new(Mutex::new(false)),
should_die: Arc::new(AtomicBool::new(false)),
_type: String::from("FileInput"),
handle: None, // No handle needed for file input
child: None, // No child process for file input
})
}
}
Expand All @@ -105,52 +110,61 @@ impl CommandInput {
impl Input for CommandInput {
/// Create a command input
fn build(name: String, command: String) -> Result<InputStream, LogriaError> {
let command_to_run = CommandInput::parse_command(&command);
let mut child = match Command::new(command_to_run[0])
.args(&command_to_run[1..])
.current_dir(current_dir().unwrap())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::null())
.spawn()
{
Ok(child) => child,
Err(why) => {
return Err(LogriaError::InvalidCommand(format!(
"Unable to connect to process: {why}"
)));
}
};

// Get stdout and stderr handles
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();

// Setup multiprocessing queues
let (err_tx, err_rx) = channel();
let (out_tx, out_rx) = channel();

// Provide check for termination outside of the thread
let should_die = Arc::new(Mutex::new(false));
let die = should_die.clone();
let should_die = Arc::new(AtomicBool::new(false));
let should_die_clone = Arc::clone(&should_die);

// Handle poll rate for each stream
let poll_rate_stdout = Arc::new(Mutex::new(RollingMean::new(5)));
let poll_rate_stderr = Arc::new(Mutex::new(RollingMean::new(5)));

// Start reading from the queues
let _ = thread::Builder::new()
let handle = thread::Builder::new()
.name(format!("CommandInput: {name}"))
.spawn(move || {
let command_to_run = CommandInput::parse_command(&command);
let mut child = match Command::new(command_to_run[0])
.args(&command_to_run[1..])
.current_dir(current_dir().unwrap())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::null())
.spawn()
{
Ok(child) => child,
Err(why) => panic!("Unable to connect to process: {why}"),
};

// Get stdout and stderr handles
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();

// Create readers
let mut stdout_reader = BufReader::new(stdout);
let mut stderr_reader = BufReader::new(stderr);

// Create threads to read stdout and stderr independently
let die_clone = die.clone();
let die_clone = Arc::clone(&should_die_clone);
let poll_stdout = poll_rate_stdout.clone();
let stdout_handle = thread::spawn(move || {
loop {
thread::sleep(time::Duration::from_millis(
poll_stdout.lock().unwrap().mean(),
));

// Exit if the process is requested to die
if die_clone.load(Ordering::Relaxed) {
break;
}

let mut buf_stdout = String::new();
let timestamp = time::Instant::now();
stdout_reader.read_line(&mut buf_stdout).unwrap();
Expand All @@ -171,21 +185,22 @@ impl Input for CommandInput {
.lock()
.unwrap()
.update(ms_per_message(timestamp.elapsed(), 1));

if *die_clone.lock().unwrap() {
break;
}
}
});

let die_clone = die.clone();
let die_clone = Arc::clone(&should_die_clone);
let poll_stderr = poll_rate_stderr.clone();
let stderr_handle = thread::spawn(move || {
loop {
thread::sleep(time::Duration::from_millis(
poll_stderr.lock().unwrap().mean(),
));

// Exit if the process is requested to die
if die_clone.load(Ordering::Relaxed) {
break;
}

let mut buf_stderr = String::new();
let timestamp = time::Instant::now();
stderr_reader.read_line(&mut buf_stderr).unwrap();
Expand All @@ -206,29 +221,22 @@ impl Input for CommandInput {
.lock()
.unwrap()
.update(ms_per_message(timestamp.elapsed(), 1));

if *die_clone.lock().unwrap() {
break;
}
}
});

// Wait for both readers to complete
stdout_handle.join().unwrap();
stderr_handle.join().unwrap();

// Kill the child process if requested
if *die.lock().unwrap() {
let _ = child.kill();
}
let _ = child.wait();
});
})
.unwrap();

Ok(InputStream {
stdout: out_rx,
stderr: err_rx,
should_die,
_type: String::from("CommandInput"),
handle: Some(handle),
child: Some(child),
})
}
}
Expand Down
28 changes: 25 additions & 3 deletions src/communication/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
cmp::max,
io::{Result, Write, stdout},
panic,
sync::atomic::Ordering,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -123,6 +124,8 @@ pub struct LogriaConfig {
pub current_status: Option<String>,
/// Function that can generate messages for display
pub generate_auxiliary_messages: Option<fn() -> Vec<String>>,
/// False if the app should continue running, True if it should stop
pub should_exit: bool,
}

pub struct MainWindow {
Expand Down Expand Up @@ -244,6 +247,7 @@ impl MainWindow {
generate_auxiliary_messages: None,
current_status: None,
message_speed_tracker: RollingMean::new(5),
should_exit: false,
},
}
}
Expand Down Expand Up @@ -805,7 +809,7 @@ impl MainWindow {
Ok(streams) => self.config.streams = streams,
Err(why) => {
self.write_to_command_line(&why.to_string())?;
build_streams_from_input(&c, false).unwrap();
self.config.streams = build_streams_from_input(&c, false).unwrap();
}
}

Expand Down Expand Up @@ -837,9 +841,22 @@ impl MainWindow {
execute!(stdout(), cursor::Show, Clear(ClearType::All))?;
disable_raw_mode()?;
for stream in &self.config.streams {
*stream.should_die.lock().unwrap() = true;
stream.should_die.store(true, Ordering::Relaxed);
}
std::process::exit(0);

for mut stream in self.config.streams.drain(..) {
if let Some(mut child) = stream.child.take() {
// Exit the child process's blocking read
let _ = child.kill();
}
if let Some(handle) = stream.handle.take() {
// Wait for the thread to finish
let _ = handle.join();
}
}

self.config.should_exit = true;
Ok(())
}

/// Update stderr and stdout buffers from every stream's queue
Expand Down Expand Up @@ -903,12 +920,17 @@ impl MainWindow {
let num_new_messages = self.receive_streams();
self.handle_smart_poll_rate(self.config.loop_time.elapsed(), num_new_messages);

if self.config.should_exit {
break Ok(());
}

if poll(Duration::from_millis(self.config.poll_rate))? {
match read()? {
Event::Key(input) => {
// Die on Ctrl-C
if input == exit_key {
self.quit()?;
return Ok(());
}

// Otherwise, match input to action
Expand Down
2 changes: 1 addition & 1 deletion src/util/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Display for LogriaError {
write!(fmt, "Invalid format description: {msg}")
}
LogriaError::InvalidCommand(msg) => {
write!(fmt, "Invalid poll command: {msg}")
write!(fmt, "Invalid command: {msg}")
}
LogriaError::CannotParseMessage(msg) => {
write!(fmt, "Unable to parse message: {msg}")
Expand Down