Skip to content

Commit 22fcbef

Browse files
authored
Remove GhciStdin process (#45)
This removes the process that manages `GhciStdin` and instead replaces the handle to that process with the `GhciStdin` struct. The motivation behind this change is to remove the need for a `StdinEvent` type and avoid the need to spawn and monitor/restart a process. This also simplifies method invocations on that process, too.
1 parent 4dfc268 commit 22fcbef

2 files changed

Lines changed: 36 additions & 159 deletions

File tree

src/ghci/mod.rs

Lines changed: 27 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use tracing::instrument;
2626

2727
mod stdin;
2828
use stdin::GhciStdin;
29-
use stdin::StdinEvent;
3029

3130
mod stdout;
3231
use stdout::GhciStdout;
@@ -62,13 +61,10 @@ pub struct Ghci {
6261
/// The running `ghci` process.
6362
process: Child,
6463
/// The handle for the stdout reader task.
65-
stdout: JoinHandle<miette::Result<()>>,
64+
stdout_handle: JoinHandle<miette::Result<()>>,
6665
/// The handle for the stderr reader task.
67-
stderr: JoinHandle<miette::Result<()>>,
68-
/// The handle for the stdin interaction task.
69-
stdin: JoinHandle<miette::Result<()>>,
70-
/// A channel for sending events to interact with the stdin task.
71-
stdin_channel: mpsc::Sender<StdinEvent>,
66+
stderr_handle: JoinHandle<miette::Result<()>>,
67+
stdin: GhciStdin,
7268
/// Count of 'sync' events sent. This lets us sync stdin/stdout -- we write a message to stdin
7369
/// instructing `ghci` to print a sentinel string, and wait to read that string on `stdout`.
7470
sync_count: AtomicUsize,
@@ -121,7 +117,6 @@ impl Ghci {
121117
let stderr = child.stderr.take().unwrap();
122118

123119
// TODO: Is this a good capacity? Maybe it should just be 1.
124-
let (stdin_sender, stdin_receiver) = mpsc::channel(8);
125120
let (stdout_sender, stdout_receiver) = mpsc::channel(8);
126121
let (stderr_sender, stderr_receiver) = mpsc::channel(8);
127122

@@ -130,23 +125,28 @@ impl Ghci {
130125
// then create weak pointers to it and swap out the tasks.
131126
let stdout_handle = task::spawn(async { Ok(()) });
132127
let stderr_handle = task::spawn(async { Ok(()) });
133-
let stdin_handle = task::spawn(async { Ok(()) });
128+
129+
let stdin =
130+
GhciStdin {
131+
stdin,
132+
stdout_sender: stdout_sender.clone(),
133+
stderr_sender: stderr_sender.clone(),
134+
};
134135

135136
let mut ret = Ghci {
136137
command: command_arc,
137138
process: child,
138-
stdout: stdout_handle,
139-
stderr: stderr_handle,
140-
stdin: stdin_handle,
141-
stdin_channel: stdin_sender.clone(),
139+
stdout_handle,
140+
stderr_handle,
141+
stdin,
142142
sync_count: AtomicUsize::new(0),
143143
modules: Default::default(),
144144
error_path: error_path.clone(),
145145
setup_commands: setup_commands.clone(),
146146
test_command,
147147
};
148148

149-
// Three tasks for my three beautiful streams.
149+
// Two tasks for my two beautiful streams.
150150
let stdout = task::spawn(
151151
GhciStdout {
152152
reader: IncrementalReader::new(stdout).with_writer(tokio::io::stdout()),
@@ -173,21 +173,11 @@ impl Ghci {
173173
}
174174
.run(),
175175
);
176-
let stdin = task::spawn(
177-
GhciStdin {
178-
stdin,
179-
stdout_sender: stdout_sender.clone(),
180-
stderr_sender: stderr_sender.clone(),
181-
receiver: stdin_receiver,
182-
}
183-
.run(),
184-
);
185176

186177
// Now, replace the `JoinHandle`s with the actual values.
187178
{
188-
ret.stdout = stdout;
189-
ret.stderr = stderr;
190-
ret.stdin = stdin;
179+
ret.stdout_handle = stdout;
180+
ret.stderr_handle = stderr;
191181
};
192182

193183
// Wait for the stdout job to start up.
@@ -207,13 +197,7 @@ impl Ghci {
207197
let span = tracing::debug_span!("Start-of-session initialization");
208198
let _enter = span.enter();
209199
let (sender, receiver) = oneshot::channel();
210-
stdin_sender
211-
.send(StdinEvent::Initialize {
212-
sender,
213-
setup_commands,
214-
})
215-
.await
216-
.into_diagnostic()?;
200+
ret.stdin.initialize(sender, setup_commands).await?;
217201
receiver.await.into_diagnostic()?;
218202
}
219203

@@ -317,10 +301,7 @@ impl Ghci {
317301
format_bulleted_list(&needs_reload)
318302
);
319303
let (sender, receiver) = oneshot::channel();
320-
self.stdin_channel
321-
.send(StdinEvent::Reload(sender))
322-
.await
323-
.into_diagnostic()?;
304+
self.stdin.reload(sender).await?;
324305
let reload_result = receiver.await.into_diagnostic()?;
325306
if let Some(CompilationResult::Err) = reload_result {
326307
compilation_failed = true;
@@ -333,13 +314,7 @@ impl Ghci {
333314
} else {
334315
// If we loaded or reloaded any modules, we should run tests.
335316
let (sender, receiver) = oneshot::channel();
336-
self.stdin_channel
337-
.send(StdinEvent::Test {
338-
sender,
339-
test_command: self.test_command.clone(),
340-
})
341-
.await
342-
.into_diagnostic()?;
317+
self.stdin.test(sender, self.test_command.clone()).await?;
343318
receiver.await.into_diagnostic()?;
344319
}
345320
}
@@ -352,27 +327,18 @@ impl Ghci {
352327
/// Sync the input and output streams of this `ghci` session. This will block until all input
353328
/// written to the `ghci` process's stdin has been read and processed.
354329
#[instrument(skip_all, level = "debug")]
355-
pub async fn sync(&self) -> miette::Result<()> {
330+
pub async fn sync(&mut self) -> miette::Result<()> {
356331
let (sentinel, receiver) = SyncSentinel::new(&self.sync_count);
357-
self.stdin_channel
358-
.send(StdinEvent::Sync(sentinel))
359-
.await
360-
.into_diagnostic()?;
332+
self.stdin.sync(sentinel).await?;
361333
receiver.await.into_diagnostic()?;
362334
Ok(())
363335
}
364336

365337
/// Run the user provided test command.
366338
#[instrument(skip_all, level = "debug")]
367-
pub async fn test(&self) -> miette::Result<()> {
339+
pub async fn test(&mut self) -> miette::Result<()> {
368340
let (sender, receiver) = oneshot::channel();
369-
self.stdin_channel
370-
.send(StdinEvent::Test {
371-
sender,
372-
test_command: self.test_command.clone(),
373-
})
374-
.await
375-
.into_diagnostic()?;
341+
self.stdin.test(sender, self.test_command.clone()).await?;
376342
receiver.await.into_diagnostic()?;
377343
Ok(())
378344
}
@@ -381,10 +347,7 @@ impl Ghci {
381347
#[instrument(skip_all, level = "debug")]
382348
pub async fn refresh_modules(&mut self) -> miette::Result<()> {
383349
let (sender, receiver) = oneshot::channel();
384-
self.stdin_channel
385-
.send(StdinEvent::ShowModules(sender))
386-
.await
387-
.into_diagnostic()?;
350+
self.stdin.show_modules(sender).await?;
388351
let map = receiver.await.into_diagnostic()?;
389352
self.modules = map;
390353
tracing::debug!(
@@ -403,10 +366,7 @@ impl Ghci {
403366
path: Utf8PathBuf,
404367
) -> miette::Result<Option<CompilationResult>> {
405368
let (sender, receiver) = oneshot::channel();
406-
self.stdin_channel
407-
.send(StdinEvent::AddModule(path.clone(), sender))
408-
.await
409-
.into_diagnostic()?;
369+
self.stdin.add_module(path.clone(), sender).await?;
410370
let result = receiver.await.into_diagnostic()?;
411371
match result {
412372
None => {
@@ -430,9 +390,8 @@ impl Ghci {
430390
async fn stop(&mut self) -> miette::Result<()> {
431391
// TODO: Worth canceling the `mpsc::Receiver`s in the tasks here?
432392
// I'd need to add events for it.
433-
self.stdout.abort();
434-
self.stderr.abort();
435-
self.stdin.abort();
393+
self.stdout_handle.abort();
394+
self.stderr_handle.abort();
436395

437396
// Kill the old `ghci` process.
438397
// TODO: Worth trying `SIGINT` or closing stdin here?

src/ghci/stdin.rs

Lines changed: 9 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use std::time::Instant;
22

3-
use backoff::backoff::Backoff;
4-
use backoff::ExponentialBackoff;
53
use camino::Utf8PathBuf;
64
use miette::IntoDiagnostic;
75
use tokio::io::AsyncWriteExt;
@@ -22,101 +20,21 @@ use super::Mode;
2220
use super::IO_MODULE_NAME;
2321
use super::PROMPT;
2422

25-
/// An event sent to a `ghci` session's stdin channel.
26-
#[derive(Debug)]
27-
pub enum StdinEvent {
28-
/// Initialize the `ghci` session; sets the initial imports, changes the prompt, etc.
29-
Initialize {
30-
sender: oneshot::Sender<()>,
31-
setup_commands: Vec<String>,
32-
},
33-
/// Reload the `ghci` session with `:reload`.
34-
Reload(oneshot::Sender<Option<CompilationResult>>),
35-
/// Run the user-provided test command, if any.
36-
Test {
37-
sender: oneshot::Sender<()>,
38-
test_command: Option<String>,
39-
},
40-
/// Add a module to the `ghci` session by path with `:add`.
41-
AddModule(Utf8PathBuf, oneshot::Sender<Option<CompilationResult>>),
42-
/// Sync the `ghci` session's input/output.
43-
Sync(SyncSentinel),
44-
/// Show the currently loaded modules with `:show modules`.
45-
ShowModules(oneshot::Sender<ModuleSet>),
46-
}
47-
4823
pub struct GhciStdin {
4924
/// Inner stdin writer.
5025
pub stdin: ChildStdin,
5126
/// Channel sender for communicating with the stdout task.
5227
pub stdout_sender: mpsc::Sender<StdoutEvent>,
5328
/// Channel sender for communicating with the stderr task.
5429
pub stderr_sender: mpsc::Sender<StderrEvent>,
55-
/// Channel receiver for communicating with this task.
56-
pub receiver: mpsc::Receiver<StdinEvent>,
5730
}
5831

5932
impl GhciStdin {
60-
#[instrument(skip_all, name = "stdin", level = "debug")]
61-
pub async fn run(mut self) -> miette::Result<()> {
62-
let mut backoff = ExponentialBackoff::default();
63-
while let Some(duration) = backoff.next_backoff() {
64-
match self.run_inner().await {
65-
Ok(()) => {
66-
// MPSC channel closed, probably a graceful shutdown?
67-
tracing::debug!("Channel closed");
68-
break;
69-
}
70-
Err(err) => {
71-
tracing::error!("{err:?}");
72-
}
73-
}
74-
75-
tracing::debug!("Waiting {duration:?} before retrying");
76-
tokio::time::sleep(duration).await;
77-
}
78-
79-
Ok(())
80-
}
81-
82-
pub async fn run_inner(&mut self) -> miette::Result<()> {
83-
while let Some(event) = self.receiver.recv().await {
84-
match event {
85-
StdinEvent::Initialize {
86-
sender,
87-
setup_commands,
88-
} => {
89-
self.initialize(sender, setup_commands).await?;
90-
}
91-
StdinEvent::Reload(sender) => {
92-
self.reload(sender).await?;
93-
}
94-
StdinEvent::Test {
95-
sender,
96-
test_command,
97-
} => {
98-
self.test(sender, test_command).await?;
99-
}
100-
StdinEvent::AddModule(path, sender) => {
101-
self.add_module(path, sender).await?;
102-
}
103-
StdinEvent::Sync(sentinel) => {
104-
self.sync(sentinel).await?;
105-
}
106-
StdinEvent::ShowModules(sender) => {
107-
self.show_modules(sender).await?;
108-
}
109-
}
110-
}
111-
112-
Ok(())
113-
}
114-
11533
/// Write a line on `stdin` and wait for a prompt on stdout.
11634
///
11735
/// The `line` should contain the trailing newline.
11836
#[instrument(skip(self), level = "debug")]
119-
async fn write_line(&mut self, line: &str) -> miette::Result<()> {
37+
pub async fn write_line(&mut self, line: &str) -> miette::Result<()> {
12038
let (sender, receiver) = oneshot::channel();
12139
self.write_line_sender(line, sender).await?;
12240
receiver.await.into_diagnostic()?;
@@ -128,7 +46,7 @@ impl GhciStdin {
12846
///
12947
/// The `line` should contain the trailing newline.
13048
#[instrument(skip(self, sender), level = "debug")]
131-
async fn write_line_sender(
49+
pub async fn write_line_sender(
13250
&mut self,
13351
line: &str,
13452
sender: oneshot::Sender<Option<CompilationResult>>,
@@ -145,7 +63,7 @@ impl GhciStdin {
14563
}
14664

14765
#[instrument(skip(self, sender), level = "debug")]
148-
async fn initialize(
66+
pub async fn initialize(
14967
&mut self,
15068
sender: oneshot::Sender<()>,
15169
setup_commands: Vec<String>,
@@ -167,7 +85,7 @@ impl GhciStdin {
16785
}
16886

16987
#[instrument(skip_all, level = "debug")]
170-
async fn reload(
88+
pub async fn reload(
17189
&mut self,
17290
sender: oneshot::Sender<Option<CompilationResult>>,
17391
) -> miette::Result<()> {
@@ -177,7 +95,7 @@ impl GhciStdin {
17795
}
17896

17997
#[instrument(skip_all, level = "debug")]
180-
async fn test(
98+
pub async fn test(
18199
&mut self,
182100
sender: oneshot::Sender<()>,
183101
test_command: Option<String>,
@@ -197,7 +115,7 @@ impl GhciStdin {
197115
}
198116

199117
#[instrument(skip(self, sender), level = "debug")]
200-
async fn add_module(
118+
pub async fn add_module(
201119
&mut self,
202120
path: Utf8PathBuf,
203121
sender: oneshot::Sender<Option<CompilationResult>>,
@@ -217,7 +135,7 @@ impl GhciStdin {
217135
}
218136

219137
#[instrument(skip(self), level = "debug")]
220-
async fn sync(&mut self, sentinel: SyncSentinel) -> miette::Result<()> {
138+
pub async fn sync(&mut self, sentinel: SyncSentinel) -> miette::Result<()> {
221139
self.set_mode(Mode::Internal).await?;
222140

223141
self.stdin
@@ -236,7 +154,7 @@ impl GhciStdin {
236154
}
237155

238156
#[instrument(skip(self, sender), level = "debug")]
239-
async fn show_modules(&mut self, sender: oneshot::Sender<ModuleSet>) -> miette::Result<()> {
157+
pub async fn show_modules(&mut self, sender: oneshot::Sender<ModuleSet>) -> miette::Result<()> {
240158
self.set_mode(Mode::Internal).await?;
241159

242160
self.stdin
@@ -252,7 +170,7 @@ impl GhciStdin {
252170
}
253171

254172
#[instrument(skip(self), level = "debug")]
255-
async fn set_mode(&self, mode: Mode) -> miette::Result<()> {
173+
pub async fn set_mode(&self, mode: Mode) -> miette::Result<()> {
256174
let mut set = JoinSet::<Result<(), oneshot::error::RecvError>>::new();
257175

258176
{

0 commit comments

Comments
 (0)