Skip to content

Commit 384c3ec

Browse files
committed
Shut down DAP server gracefully on disconnection (#1018)
1 parent 6f8acef commit 384c3ec

File tree

2 files changed

+109
-81
lines changed

2 files changed

+109
-81
lines changed

AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ If changes are needed in these files, that must happen in the separate Positron
8989

9090
- When a `match` expression is the last expression in a function, omit `return` keywords in match arms. Let the expression evaluate to the function's return value.
9191

92-
- For error messages and logging, prefer direct formatting syntax: `Err(anyhow!("Message: {err}"))` instead of `Err(anyhow!("Message: {}", err))`. This also applies to `log::error!` and `log::warn!` and `log::info!` macros.
92+
- For error messages and logging, prefer direct formatting syntax: `Err(anyhow!("Message: {err}"))` instead of `Err(anyhow!("Message: {}", err))`. This also applies to `log::error!` and `log::warn!` and `log::info!` macros. For logging errors specifically, use Debug formatting `{err:?}` to get more detailed error information.
9393

9494
- Use `log::trace!` instead of `log::debug!`.
9595

crates/ark/src/dap/dap_server.rs

Lines changed: 108 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crossbeam::channel::unbounded;
2222
use crossbeam::channel::Receiver;
2323
use crossbeam::channel::Sender;
2424
use crossbeam::select;
25+
use dap::errors::ServerError;
2526
use dap::events::*;
2627
use dap::prelude::*;
2728
use dap::requests::*;
@@ -150,9 +151,18 @@ fn listen_dap_events<W: Write>(
150151
loop {
151152
select!(
152153
recv(backend_events_rx) -> event => {
154+
let event = match event {
155+
Ok(event) => event,
156+
Err(err) => {
157+
// Channel closed, sender dropped
158+
log::info!("DAP: Event channel closed: {err:?}");
159+
return;
160+
},
161+
};
162+
153163
log::trace!("DAP: Got event from backend: {:?}", event);
154164

155-
let event = match event.unwrap() {
165+
let event = match event {
156166
DapBackendEvent::Continued => {
157167
Event::Continued(ContinuedEventBody {
158168
thread_id: THREAD_ID,
@@ -191,7 +201,10 @@ fn listen_dap_events<W: Write>(
191201
};
192202

193203
let mut output = output.lock().unwrap();
194-
output.send_event(event).unwrap();
204+
if let Err(err) = output.send_event(event) {
205+
log::warn!("DAP: Failed to send event, closing: {err:?}");
206+
return;
207+
}
195208
},
196209

197210
// Break the loop and terminate the thread
@@ -229,91 +242,83 @@ impl<R: Read, W: Write> DapServer<R, W> {
229242

230243
pub fn serve(&mut self) -> bool {
231244
log::trace!("DAP: Polling");
232-
let req = match self.server.poll_request().unwrap() {
233-
Some(req) => req,
234-
None => return false,
245+
let req = match self.server.poll_request() {
246+
Ok(Some(req)) => req,
247+
Ok(None) => return false,
248+
Err(err) => {
249+
log::warn!("DAP: Connection closed: {err:?}");
250+
return false;
251+
},
235252
};
236253
log::trace!("DAP: Got request: {:#?}", req);
237254

238255
let cmd = req.command.clone();
239256

240-
match cmd {
241-
Command::Initialize(args) => {
242-
self.handle_initialize(req, args);
243-
},
244-
Command::Attach(args) => {
245-
self.handle_attach(req, args);
246-
},
247-
Command::Disconnect(args) => {
248-
self.handle_disconnect(req, args);
249-
},
250-
Command::Restart(args) => {
251-
self.handle_restart(req, args);
252-
},
253-
Command::Threads => {
254-
self.handle_threads(req);
255-
},
256-
Command::SetBreakpoints(args) => {
257-
self.handle_set_breakpoints(req, args);
258-
},
257+
let result = match cmd {
258+
Command::Initialize(args) => self.handle_initialize(req, args),
259+
Command::Attach(args) => self.handle_attach(req, args),
260+
Command::Disconnect(args) => self.handle_disconnect(req, args),
261+
Command::Restart(args) => self.handle_restart(req, args),
262+
Command::Threads => self.handle_threads(req),
263+
Command::SetBreakpoints(args) => self.handle_set_breakpoints(req, args),
259264
Command::SetExceptionBreakpoints(args) => {
260-
self.handle_set_exception_breakpoints(req, args);
261-
},
262-
Command::StackTrace(args) => {
263-
self.handle_stacktrace(req, args);
264-
},
265-
Command::Source(args) => {
266-
self.handle_source(req, args);
267-
},
268-
Command::Scopes(args) => {
269-
self.handle_scopes(req, args);
270-
},
271-
Command::Variables(args) => {
272-
self.handle_variables(req, args);
265+
self.handle_set_exception_breakpoints(req, args)
273266
},
267+
Command::StackTrace(args) => self.handle_stacktrace(req, args),
268+
Command::Source(args) => self.handle_source(req, args),
269+
Command::Scopes(args) => self.handle_scopes(req, args),
270+
Command::Variables(args) => self.handle_variables(req, args),
274271
Command::Continue(args) => {
275272
let resp = ResponseBody::Continue(ContinueResponse {
276273
all_threads_continued: Some(true),
277274
});
278-
self.handle_step(req, args, DebugRequest::Continue, resp);
275+
self.handle_step(req, args, DebugRequest::Continue, resp)
279276
},
280277
Command::Next(args) => {
281-
self.handle_step(req, args, DebugRequest::Next, ResponseBody::Next);
278+
self.handle_step(req, args, DebugRequest::Next, ResponseBody::Next)
282279
},
283280
Command::StepIn(args) => {
284-
self.handle_step(req, args, DebugRequest::StepIn, ResponseBody::StepIn);
281+
self.handle_step(req, args, DebugRequest::StepIn, ResponseBody::StepIn)
285282
},
286283
Command::StepOut(args) => {
287-
self.handle_step(req, args, DebugRequest::StepOut, ResponseBody::StepOut);
284+
self.handle_step(req, args, DebugRequest::StepOut, ResponseBody::StepOut)
288285
},
289286
_ => {
290287
log::warn!("DAP: Unknown request");
291288
let rsp = req.error("Ark DAP: Unknown request");
292-
self.server.respond(rsp).unwrap();
289+
self.respond(rsp)
293290
},
291+
};
292+
293+
if let Err(err) = result {
294+
log::warn!("DAP: Handler failed, closing connection: {err:?}");
295+
return false;
294296
}
295297

296298
true
297299
}
298300

299-
fn respond(&mut self, rsp: Response) {
301+
fn respond(&mut self, rsp: Response) -> Result<(), ServerError> {
300302
log::trace!("DAP: Responding to request: {rsp:#?}");
301-
self.server.respond(rsp).unwrap();
303+
self.server.respond(rsp)
302304
}
303305

304-
fn send_event(&mut self, event: Event) {
306+
fn send_event(&mut self, event: Event) -> Result<(), ServerError> {
305307
log::trace!("DAP: Sending event: {event:#?}");
306-
self.server.send_event(event).unwrap();
308+
self.server.send_event(event)
307309
}
308310

309-
fn handle_initialize(&mut self, req: Request, _args: InitializeArguments) {
311+
fn handle_initialize(
312+
&mut self,
313+
req: Request,
314+
_args: InitializeArguments,
315+
) -> Result<(), ServerError> {
310316
let rsp = req.success(ResponseBody::Initialize(types::Capabilities {
311317
supports_restart_request: Some(true),
312318
..Default::default()
313319
}));
314-
self.respond(rsp);
315-
316-
self.send_event(Event::Initialized);
320+
self.respond(rsp)?;
321+
self.send_event(Event::Initialized)
317322
}
318323

319324
// Handle SetBreakpoints requests from the frontend.
@@ -331,12 +336,15 @@ impl<R: Read, W: Write> DapServer<R, W> {
331336
// - When a user unchecks a breakpoint, it appears as a deletion (omitted
332337
// from the request). We preserve verified breakpoints as Disabled so we
333338
// can restore their state when re-enabled without requiring re-sourcing.
334-
fn handle_set_breakpoints(&mut self, req: Request, args: SetBreakpointsArguments) {
339+
fn handle_set_breakpoints(
340+
&mut self,
341+
req: Request,
342+
args: SetBreakpointsArguments,
343+
) -> Result<(), ServerError> {
335344
let Some(path) = args.source.path.as_ref() else {
336345
// We don't currently have virtual documents managed via source references
337346
log::warn!("Missing a path to set breakpoints for.");
338-
self.respond(req.error("Missing a path to set breakpoints for"));
339-
return;
347+
return self.respond(req.error("Missing a path to set breakpoints for"));
340348
};
341349

342350
// We currently only support "path" URIs as Positron never sends URIs.
@@ -349,8 +357,7 @@ impl<R: Read, W: Write> DapServer<R, W> {
349357
let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse {
350358
breakpoints: vec![],
351359
}));
352-
self.respond(rsp);
353-
return;
360+
return self.respond(rsp);
354361
},
355362
};
356363

@@ -361,10 +368,9 @@ impl<R: Read, W: Write> DapServer<R, W> {
361368
Ok(content) => content,
362369
Err(err) => {
363370
// TODO: What do we do with breakpoints in virtual documents?
364-
log::error!("Failed to read file '{path}': {err}");
371+
log::warn!("Failed to read file '{path}': {err:?}");
365372
let rsp = req.error(&format!("Failed to read file: {path}"));
366-
self.respond(rsp);
367-
return;
373+
return self.respond(rsp);
368374
},
369375
};
370376

@@ -502,31 +508,39 @@ impl<R: Read, W: Write> DapServer<R, W> {
502508
breakpoints: response_breakpoints,
503509
}));
504510

505-
self.respond(rsp);
511+
self.respond(rsp)
506512
}
507513

508-
fn handle_attach(&mut self, req: Request, _args: AttachRequestArguments) {
514+
fn handle_attach(
515+
&mut self,
516+
req: Request,
517+
_args: AttachRequestArguments,
518+
) -> Result<(), ServerError> {
509519
let rsp = req.success(ResponseBody::Attach);
510-
self.respond(rsp);
520+
self.respond(rsp)?;
511521

512522
self.send_event(Event::Thread(ThreadEventBody {
513523
reason: ThreadEventReason::Started,
514524
thread_id: THREAD_ID,
515-
}));
525+
}))
516526
}
517527

518-
fn handle_disconnect(&mut self, req: Request, _args: DisconnectArguments) {
528+
fn handle_disconnect(
529+
&mut self,
530+
req: Request,
531+
_args: DisconnectArguments,
532+
) -> Result<(), ServerError> {
519533
// Only send `Q` if currently in a debugging session.
520534
let is_debugging = { self.state.lock().unwrap().is_debugging };
521535
if is_debugging {
522536
self.send_command(DebugRequest::Quit);
523537
}
524538

525539
let rsp = req.success(ResponseBody::Disconnect);
526-
self.respond(rsp);
540+
self.respond(rsp)
527541
}
528542

529-
fn handle_restart<T>(&mut self, req: Request, _args: T) {
543+
fn handle_restart<T>(&mut self, req: Request, _args: T) -> Result<(), ServerError> {
530544
// If connected to Positron, forward the restart command to the
531545
// frontend. Otherwise ignore it.
532546
if let Some(tx) = &self.comm_tx {
@@ -535,35 +549,39 @@ impl<R: Read, W: Write> DapServer<R, W> {
535549
}
536550

537551
let rsp = req.success(ResponseBody::Restart);
538-
self.respond(rsp);
552+
self.respond(rsp)
539553
}
540554

541555
// All servers must respond to `Threads` requests, possibly with
542556
// a dummy thread as is the case here
543-
fn handle_threads(&mut self, req: Request) {
557+
fn handle_threads(&mut self, req: Request) -> Result<(), ServerError> {
544558
let rsp = req.success(ResponseBody::Threads(ThreadsResponse {
545559
threads: vec![Thread {
546560
id: THREAD_ID,
547561
name: String::from("R console"),
548562
}],
549563
}));
550-
self.respond(rsp);
564+
self.respond(rsp)
551565
}
552566

553567
fn handle_set_exception_breakpoints(
554568
&mut self,
555569
req: Request,
556570
_args: SetExceptionBreakpointsArguments,
557-
) {
571+
) -> Result<(), ServerError> {
558572
let rsp = req.success(ResponseBody::SetExceptionBreakpoints(
559573
SetExceptionBreakpointsResponse {
560574
breakpoints: None, // TODO
561575
},
562576
));
563-
self.respond(rsp);
577+
self.respond(rsp)
564578
}
565579

566-
fn handle_stacktrace(&mut self, req: Request, args: StackTraceArguments) {
580+
fn handle_stacktrace(
581+
&mut self,
582+
req: Request,
583+
args: StackTraceArguments,
584+
) -> Result<(), ServerError> {
567585
let state = self.state.lock().unwrap();
568586
let stack = &state.stack;
569587
let fallback_sources = &state.fallback_sources;
@@ -597,17 +615,17 @@ impl<R: Read, W: Write> DapServer<R, W> {
597615
}));
598616

599617
drop(state);
600-
self.respond(rsp);
618+
self.respond(rsp)
601619
}
602620

603-
fn handle_source(&mut self, req: Request, _args: SourceArguments) {
621+
fn handle_source(&mut self, req: Request, _args: SourceArguments) -> Result<(), ServerError> {
604622
let message = "Unsupported `source` request: {req:?}";
605623
log::error!("{message}");
606624
let rsp = req.error(message);
607-
self.respond(rsp);
625+
self.respond(rsp)
608626
}
609627

610-
fn handle_scopes(&mut self, req: Request, args: ScopesArguments) {
628+
fn handle_scopes(&mut self, req: Request, args: ScopesArguments) -> Result<(), ServerError> {
611629
let state = self.state.lock().unwrap();
612630
let frame_id_to_variables_reference = &state.frame_id_to_variables_reference;
613631

@@ -637,15 +655,19 @@ impl<R: Read, W: Write> DapServer<R, W> {
637655
let rsp = req.success(ResponseBody::Scopes(ScopesResponse { scopes }));
638656

639657
drop(state);
640-
self.respond(rsp);
658+
self.respond(rsp)
641659
}
642660

643-
fn handle_variables(&mut self, req: Request, args: VariablesArguments) {
661+
fn handle_variables(
662+
&mut self,
663+
req: Request,
664+
args: VariablesArguments,
665+
) -> Result<(), ServerError> {
644666
let variables_reference = args.variables_reference;
645667
let variables = self.collect_r_variables(variables_reference);
646668
let variables = self.into_variables(variables);
647669
let rsp = req.success(ResponseBody::Variables(VariablesResponse { variables }));
648-
self.respond(rsp);
670+
self.respond(rsp)
649671
}
650672

651673
fn collect_r_variables(&self, variables_reference: i64) -> Vec<RVariable> {
@@ -708,10 +730,16 @@ impl<R: Read, W: Write> DapServer<R, W> {
708730
out
709731
}
710732

711-
fn handle_step<A>(&mut self, req: Request, _args: A, cmd: DebugRequest, resp: ResponseBody) {
733+
fn handle_step<A>(
734+
&mut self,
735+
req: Request,
736+
_args: A,
737+
cmd: DebugRequest,
738+
resp: ResponseBody,
739+
) -> Result<(), ServerError> {
712740
self.send_command(cmd);
713741
let rsp = req.success(resp);
714-
self.respond(rsp);
742+
self.respond(rsp)
715743
}
716744

717745
fn send_command(&mut self, cmd: DebugRequest) {

0 commit comments

Comments
 (0)