Skip to content

Commit c35f95a

Browse files
authored
DUX-5090 send_sigint: exponential backoff loop + sync barrier (#433)
1 parent 44b6df1 commit c35f95a

4 files changed

Lines changed: 190 additions & 23 deletions

File tree

src/ghci/manager.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,30 @@ impl GhciManager {
333333

334334
// Send a SIGINT to interrupt the reload.
335335
// NB: This may take a couple seconds to register.
336-
ghci.lock().await.send_sigint().await?;
337-
338-
return Ok(HandleResult::Interrupted(event));
336+
match ghci.lock().await.send_sigint().await {
337+
Ok(()) => return Ok(HandleResult::Interrupted(event)),
338+
Err(e) => {
339+
// `send_sigint` may kill the session if it
340+
// cannot leave ghci in a usable state (e.g.
341+
// sync barrier failure). Wait for the exit
342+
// and route through the standard restart
343+
// path with the merged event preserved.
344+
tracing::warn!(
345+
error = ?e,
346+
"Failed to interrupt ghci; session was killed for restart",
347+
);
348+
pending_event = Some(event);
349+
let status = exited_receiver
350+
.recv()
351+
.await
352+
.ok_or_else(|| {
353+
miette::miette!(
354+
"ghci exit channel closed after kill"
355+
)
356+
})?;
357+
break Some(status);
358+
}
359+
}
339360
}
340361
}
341362
}

src/ghci/mod.rs

Lines changed: 138 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@ use std::fmt::Debug;
1313
use std::io::IsTerminal;
1414
use std::process::ExitStatus;
1515
use std::process::Stdio;
16+
use std::time::Duration;
1617
use std::time::Instant;
1718
use tokio::io::DuplexStream;
1819
use tokio::sync::oneshot;
1920
use tokio::task::JoinHandle;
2021

2122
use aho_corasick::AhoCorasick;
23+
use backoff::backoff::Backoff;
24+
use backoff::ExponentialBackoff;
2225
use camino::Utf8Path;
2326
use camino::Utf8PathBuf;
2427
use miette::miette;
@@ -268,6 +271,8 @@ pub struct Ghci {
268271
search_paths: ShowPaths,
269272
/// Tasks running `async:` shell commands in the background.
270273
command_handles: Vec<JoinHandle<miette::Result<ExitStatus>>>,
274+
/// Monotonic counter for generating unique sync barrier nonces.
275+
sync_nonce: u64,
271276
}
272277

273278
impl Debug for Ghci {
@@ -398,6 +403,7 @@ impl Ghci {
398403
search_paths: Default::default(),
399404
},
400405
command_handles,
406+
sync_nonce: 0,
401407
})
402408
}
403409

@@ -837,20 +843,141 @@ impl Ghci {
837843
Ok(())
838844
}
839845

846+
/// Interrupt the running GHCi session.
847+
///
848+
/// On `Err`, the GHCi session may have been killed (e.g. because the sync
849+
/// barrier could not restore the prompt). Callers MUST treat an error here
850+
/// as a session-died event and route through the normal restart path
851+
/// rather than propagating it as fatal. See [`Ghci::sync_barrier`] for details.
840852
#[instrument(skip_all, level = "debug")]
841853
async fn send_sigint(&mut self) -> miette::Result<()> {
842854
let start_instant = Instant::now();
843-
signal::killpg(self.process_group_id, Signal::SIGINT)
844-
.into_diagnostic()
845-
.wrap_err("Failed to send `Ctrl-C` (`SIGINT`) to ghci session")?;
846-
self.stdout
847-
.prompt(
848-
crate::incremental_reader::FindAt::Anywhere,
849-
// Ignore compilation messages.
850-
&mut Default::default(),
851-
)
852-
.await?;
853-
tracing::debug!("Interrupted ghci in {:.2?}", start_instant.elapsed());
855+
856+
// Phase 1: Send SIGINT repeatedly until we find a clean, uninterrupted prompt.
857+
//
858+
// An interrupted reload can cause interleaved output between the GHCi prompt and
859+
// compilation output (due to GHC bug where the logging thread isn't stopped on
860+
// async exception — see `runParPipelines` in GHC's Driver/Make.hs). We send
861+
// SIGINT with exponential backoff until we see a prompt that isn't garbled.
862+
let mut backoff = ExponentialBackoff {
863+
initial_interval: Duration::from_millis(5),
864+
max_interval: Duration::from_millis(100),
865+
multiplier: 1.25,
866+
max_elapsed_time: Some(Duration::from_secs(10)),
867+
..Default::default()
868+
};
869+
870+
let mut sigint_count: usize = 0;
871+
loop {
872+
let Some(delay) = backoff.next_backoff() else {
873+
return Err(miette!(
874+
"Timed out waiting for GHCi to respond to SIGINT after {:.2?}",
875+
start_instant.elapsed()
876+
));
877+
};
878+
879+
sigint_count += 1;
880+
signal::killpg(self.process_group_id, Signal::SIGINT)
881+
.into_diagnostic()
882+
.wrap_err("Failed to send `Ctrl-C` (`SIGINT`) to ghci session")?;
883+
tracing::debug!(count = sigint_count, "Sent SIGINT");
884+
885+
let found = self.stdout.buffer_and_drain_prompts(delay).await?;
886+
if found > 0 {
887+
tracing::debug!(
888+
found,
889+
elapsed = ?start_instant.elapsed(),
890+
"Found prompt after SIGINT"
891+
);
892+
break;
893+
}
894+
}
895+
896+
// If we only sent 1 SIGINT, then there cannot be extra prompts waiting to be read from the
897+
// buffer; only do the sync barrier process if we sent multiple SIGINTs.
898+
if sigint_count > 1 {
899+
self.sync_barrier().await?;
900+
}
901+
902+
tracing::info!("Interrupted ghci in {:.2?}", start_instant.elapsed());
903+
Ok(())
904+
}
905+
906+
/// Sync barrier: deterministically consume all stale prompts from the pipe.
907+
///
908+
/// We rely on the fact that GHCi processes input commands one at a time, in order. When we send
909+
/// a command to GHCi, we read its output up until the next prompt and know that the output
910+
/// we've read matches the command we sent. This is important because we parse GHCi output in
911+
/// several places (e.g. compilation errors go to the `error_log`, `:show paths` and `:show
912+
/// targets` are used to inform module additions/removals/reloads, etc.), so if we're parsing
913+
/// output from a different command, we'll Have Problems.
914+
///
915+
/// When we're hitting Ctrl-C repeatedly (in case of a user input prompt interleaved with
916+
/// compilation output in GHCi's stdout stream), we don't know how many times GHCi will print a
917+
/// prompt that we can read.
918+
///
919+
/// Therefore, we _change_ the prompt and read until _that_ specific prompt shows up in the
920+
/// output, using a unique (to the `ghci` process) and different prompt each time we call this
921+
/// method. This ensures we consume all remaining stale output, without having to wait until we
922+
/// "think it's safe" and wasting the user's time after GHCi is done writing.
923+
#[instrument(skip_all, level = "debug")]
924+
async fn sync_barrier(&mut self) -> miette::Result<()> {
925+
self.sync_nonce += 1;
926+
let nonce = self.sync_nonce;
927+
let sync_marker = format!("~~~GHCIWATCH-SYNC-{nonce}~~~");
928+
929+
// Set the prompt to our sync marker.
930+
self.stdin
931+
.write_set_prompt(&sync_marker)
932+
.await
933+
.wrap_err("Failed to write sync command to ghci stdin")?;
934+
935+
// From here until the prompt is restored, any failure leaves the session
936+
// unable to match `PROMPT` again. Restoring in-band after a failed read
937+
// is not safe (the buffer is in an unknown state, and confirming the
938+
// restore would itself depend on prompt matching), so on any error we
939+
// SIGKILL the process and let the manager restart the session.
940+
let sync_timeout = Duration::from_secs(3);
941+
let read =
942+
tokio::time::timeout(sync_timeout, self.stdout.read_until_marker(&sync_marker)).await;
943+
let result = match read {
944+
Ok(Ok(_ghci_output)) => self
945+
.stdin
946+
.set_prompt(
947+
&mut self.stdout,
948+
PROMPT,
949+
crate::incremental_reader::FindAt::LineStart,
950+
// We don't expect to see any compilation here, so we pass a stub
951+
// `CompilationLog` and discard it.
952+
&mut Default::default(),
953+
)
954+
.await
955+
.wrap_err("Failed to restore prompt after sync barrier"),
956+
Ok(Err(e)) => Err(e).wrap_err("Failed to read until sync marker"),
957+
Err(_elapsed) => Err(miette!(
958+
"Timed out waiting for GHCi sync marker after {sync_timeout:?}"
959+
)),
960+
};
961+
962+
if let Err(e) = result {
963+
// Kill the process directly rather than going through `restart_sender`.
964+
// `restart_sender` is the graceful-shutdown path: `GhciProcess` consumes it
965+
// and intentionally suppresses `exited_sender`, so the manager would never
966+
// learn ghci died. We need the wait future in `GhciProcess::run` to win the
967+
// select so `exited_sender` fires and `wait_and_restart_runtime` takes over.
968+
if let Err(kill_err) = signal::killpg(self.process_group_id, Signal::SIGKILL)
969+
.into_diagnostic()
970+
.wrap_err("Failed to send `SIGKILL` to ghci session")
971+
{
972+
tracing::error!(
973+
error = %kill_err,
974+
"Failed to SIGKILL ghci after sync_barrier failure",
975+
);
976+
}
977+
return Err(e).wrap_err(
978+
"ghci sync barrier failed; killed the session because the prompt could not be restored",
979+
);
980+
}
854981
Ok(())
855982
}
856983

src/ghci/stdin.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,32 @@ impl GhciStdin {
7171
Ok(())
7272
}
7373

74+
/// Write `:set prompt "{prompt}"\n` to stdin without reading any response.
75+
///
76+
/// Callers that need to wait for GHCi to acknowledge the new prompt should use
77+
/// [`Self::set_prompt`] instead.
78+
pub async fn write_set_prompt(&mut self, prompt: &str) -> miette::Result<()> {
79+
self.stdin
80+
.write_all(format!(":set prompt \"{prompt}\"\n").as_bytes())
81+
.await
82+
.into_diagnostic()
83+
}
84+
85+
/// Set the GHCi prompt to the given string.
86+
///
87+
/// This writes `:set prompt` and waits for GHCi to show the new prompt.
88+
#[instrument(skip(self, stdout), level = "debug")]
89+
pub async fn set_prompt(
90+
&mut self,
91+
stdout: &mut GhciStdout,
92+
prompt: &str,
93+
find: FindAt,
94+
log: &mut CompilationLog,
95+
) -> miette::Result<()> {
96+
self.write_set_prompt(prompt).await?;
97+
stdout.prompt(find, log).await
98+
}
99+
74100
#[instrument(skip(self, stdout), name = "stdin_initialize", level = "debug")]
75101
pub async fn initialize(
76102
&mut self,
@@ -79,13 +105,8 @@ impl GhciStdin {
79105
) -> miette::Result<()> {
80106
// We tell stdout/stderr we're compiling for the first prompt because this includes all the
81107
// module compilation before the first prompt.
82-
self.write_line_with_prompt_at(
83-
stdout,
84-
&format!(":set prompt {PROMPT}\n"),
85-
FindAt::Anywhere,
86-
log,
87-
)
88-
.await?;
108+
self.set_prompt(stdout, PROMPT, FindAt::Anywhere, log)
109+
.await?;
89110
self.write_line(stdout, &format!(":set prompt-cont {PROMPT}\n"), log)
90111
.await?;
91112
Ok(())

src/ghci/stdout.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ impl GhciStdout {
102102

103103
/// Read any immediately-available output from the pipe, then drain stale prompts from
104104
/// the internal buffer. Returns the number of prompts found and discarded.
105-
#[expect(unused)]
106105
pub async fn buffer_and_drain_prompts(&mut self, timeout: Duration) -> miette::Result<usize> {
107106
self.reader
108107
.buffer_available(&mut self.buffer, timeout, WriteBehavior::NoFinalLine)
@@ -123,7 +122,6 @@ impl GhciStdout {
123122
/// Used by `send_sigint` to synchronize with GHCi after an interrupt: a sync expression
124123
/// is sent on stdin and this method reads until its output appears, guaranteeing that all
125124
/// prior output has been consumed.
126-
#[expect(unused)]
127125
pub async fn read_until_marker(&mut self, marker: &str) -> miette::Result<String> {
128126
let pattern = AhoCorasick::from_anchored_patterns([marker]);
129127
self.reader

0 commit comments

Comments
 (0)