Skip to content

Commit a878923

Browse files
committed
refactor(rpc): move server lifecycle into rpc library
Move generic RPC server run, ready, and stop handling from dune_rpc_impl into Rpc.Server.Lifecycle. Keep Dune-specific startup-failure handling and the status-line wrapper in dune_rpc_impl. Signed-off-by: Rudi Grinberg <me@rgrinberg.com>
1 parent 36833d7 commit a878923

5 files changed

Lines changed: 100 additions & 82 deletions

File tree

src/dune_rpc_impl/dune

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
source
1010
memo
1111
dune_util
12-
dune_trace
1312
fiber
1413
stdune
1514
unix)

src/dune_rpc_impl/server.ml

Lines changed: 21 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -27,53 +27,6 @@ module Session = Rpc.Server.Session
2727
module Handler = Rpc.Server.Handler
2828
module Csexp_rpc = Rpc.Csexp_rpc
2929

30-
module Run = struct
31-
type t =
32-
{ handler : Rpc.Server.t
33-
; action_runner : Action_runner.Rpc_server.t
34-
; pool : Fiber.Pool.t
35-
; root : string
36-
; where : Dune_rpc.Where.t
37-
; server : Csexp_rpc.Server.t Lazy.t
38-
; startup_ivar : (Csexp_rpc.Server.t, Exn_with_backtrace.t) result Fiber.Ivar.t
39-
; registry : [ `Add | `Skip ]
40-
; watch_mode : Watch_mode_config.t
41-
}
42-
43-
let run t =
44-
let registry = Rpc.Registry.create ~root:t.root ~where:t.where t.registry in
45-
let print_uncaught_rpc_error exn =
46-
Console.print [ Pp.text "Uncaught RPC Error"; Exn_with_backtrace.pp exn ]
47-
in
48-
let with_print_errors f () =
49-
Fiber.with_error_handler f ~on_error:(fun exn ->
50-
print_uncaught_rpc_error exn;
51-
Exn_with_backtrace.reraise exn)
52-
in
53-
let run () =
54-
let open Fiber.O in
55-
match Exn_with_backtrace.try_with (fun () -> Lazy.force t.server) with
56-
| Error exn ->
57-
Dune_trace.emit Rpc (fun () -> Dune_trace.Event.Rpc.startup_failure exn);
58-
print_uncaught_rpc_error exn;
59-
Fiber.Ivar.fill t.startup_ivar (Error exn)
60-
| Ok server ->
61-
let* () = Fiber.Ivar.fill t.startup_ivar (Ok server) in
62-
Fiber.fork_and_join_unit
63-
(fun () ->
64-
let* sessions = Csexp_rpc.Server.serve server in
65-
let () = Rpc.Registry.register registry in
66-
let* () = Rpc.Server.serve sessions t.handler in
67-
Fiber.Pool.close t.pool)
68-
(fun () -> Fiber.Pool.run t.pool)
69-
in
70-
with_print_errors run
71-
|> Fiber.finalize ~finally:(fun () ->
72-
Rpc.Registry.cleanup registry;
73-
Fiber.return ())
74-
;;
75-
end
76-
7730
type build_request =
7831
| Build of Dune_lang.Dep_conf.t list
7932
| Runtest of string list
@@ -114,7 +67,11 @@ module Clients = struct
11467
end
11568

11669
type server =
117-
{ config : Run.t
70+
{ lifecycle : Rpc.Server.Lifecycle.t
71+
; action_runner : Action_runner.Rpc_server.t
72+
; where : Dune_rpc.Where.t
73+
; registry : [ `Add | `Skip ]
74+
; watch_mode : Watch_mode_config.t
11875
; mutable clients : Clients.t
11976
}
12077

@@ -139,7 +96,7 @@ let pp_client_count t =
13996
;;
14097

14198
let refresh_client_count_status_line t =
142-
match t.server.config.registry with
99+
match t.server.registry with
143100
| `Skip -> ()
144101
| `Add -> Console.Status_line.refresh ()
145102
;;
@@ -152,9 +109,9 @@ let () =
152109
;;
153110

154111
let ready (t : t) =
155-
Fiber.Ivar.read t.server.config.startup_ivar
112+
Rpc.Server.Lifecycle.ready t.server.lifecycle
156113
>>= function
157-
| Ok server -> Csexp_rpc.Server.ready server
114+
| Ok () -> Fiber.return ()
158115
| Error exn ->
159116
Dune_util.Report_error.report exn;
160117
Scheduler.shutdown `Failure;
@@ -163,13 +120,8 @@ let ready (t : t) =
163120

164121
let stop (t : t) =
165122
Fiber.fork_and_join_unit
166-
(fun () -> Action_runner.Rpc_server.stop t.server.config.action_runner)
167-
(fun () ->
168-
Fiber.of_thunk (fun () ->
169-
match Fiber.Ivar.peek t.server.config.startup_ivar with
170-
| None -> Fiber.return ()
171-
| Some (Error _) -> Fiber.return ()
172-
| Some (Ok server) -> Csexp_rpc.Server.stop server))
123+
(fun () -> Action_runner.Rpc_server.stop t.server.action_runner)
124+
(fun () -> Rpc.Server.Lifecycle.stop t.server.lifecycle)
173125
;;
174126

175127
let current_errors () =
@@ -373,7 +325,7 @@ let handler (t : t Fdecl.t) action_runner_server : unit Handler.t =
373325
let () =
374326
let f _ () =
375327
let t = Fdecl.get t in
376-
match t.server.config.watch_mode with
328+
match t.server.watch_mode with
377329
| No -> Fiber.return `Not_in_watch_mode
378330
| Yes _ ->
379331
let+ () = Scheduler.flush_file_watcher () in
@@ -393,7 +345,7 @@ let handler (t : t Fdecl.t) action_runner_server : unit Handler.t =
393345
Session.Stage1.close entry.session))
394346
in
395347
let shutdown () =
396-
let* () = Csexp_rpc.Server.stop (Lazy.force t.server.config.server) in
348+
let* () = stop t in
397349
Scheduler.shutdown `Ok;
398350
Fiber.return ()
399351
in
@@ -490,18 +442,11 @@ let create ~registry ~root ~build watch_mode =
490442
in
491443
let action_runner = Action_runner.Rpc_server.create () in
492444
let handler = Rpc.Server.make (handler t action_runner) in
493-
{ Run.handler
494-
; action_runner
495-
; pool = Fiber.Pool.create ()
496-
; root
497-
; where
498-
; server
499-
; registry
500-
; startup_ivar = Fiber.Ivar.create ()
501-
; watch_mode
502-
}
445+
let lifecycle = Rpc.Server.Lifecycle.create ~handler ~root ~where ~registry ~server in
446+
action_runner, lifecycle
503447
in
504-
let server = { config; clients = Clients.empty } in
448+
let action_runner, lifecycle = config in
449+
let server = { lifecycle; action_runner; where; registry; watch_mode; clients = Clients.empty } in
505450
let res = { server; build } in
506451
current := Some server;
507452
Fdecl.set t res;
@@ -511,10 +456,10 @@ let create ~registry ~root ~build watch_mode =
511456
let run t =
512457
let run () =
513458
Fiber.fork_and_join_unit
514-
(fun () -> Run.run t.server.config)
515-
(fun () -> Action_runner.Rpc_server.run t.server.config.action_runner)
459+
(fun () -> Rpc.Server.Lifecycle.run t.server.lifecycle)
460+
(fun () -> Action_runner.Rpc_server.run t.server.action_runner)
516461
in
517-
match t.server.config.registry with
462+
match t.server.registry with
518463
| `Skip -> run ()
519464
| `Add ->
520465
let section = Console.Status_line.add_section (Live (fun () -> pp_client_count t)) in
@@ -578,5 +523,5 @@ end
578523

579524
let with_background_rpc = Background.with_background_rpc
580525
let ensure_ready = Background.ensure_ready
581-
let listening_address t = t.server.config.where
582-
let action_runner t = t.server.config.action_runner
526+
let listening_address t = t.server.where
527+
let action_runner t = t.server.action_runner

src/dune_rpc_impl/server.mli

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,6 @@ val create
2121
-> Watch_mode_config.t
2222
-> t
2323

24-
(** Stop accepting new rpc connections. Fiber returns when all existing
25-
connections terminate *)
26-
val stop : t -> unit Fiber.t
27-
28-
val ready : t -> unit Fiber.t
2924
val run : t -> unit Fiber.t
3025
val with_background_rpc : t -> (unit -> 'a Fiber.t) -> 'a Fiber.t
3126
val ensure_ready : unit -> unit Fiber.t

src/rpc/server.ml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
open Stdune
2+
module Rpc_registry = Registry
23
open Dune_rpc.Private
34
open Fiber.O
45
module Session_id = Stdune.Id.Make ()
@@ -883,3 +884,63 @@ let serve =
883884
in
884885
M.serve
885886
;;
887+
888+
module Lifecycle = struct
889+
type nonrec t =
890+
{ handler : t
891+
; registry : Rpc_registry.t
892+
; server : Csexp_rpc.Server.t Lazy.t
893+
; startup_ivar : (Csexp_rpc.Server.t, Exn_with_backtrace.t) result Fiber.Ivar.t
894+
}
895+
896+
let create ~handler ~root ~where ~registry ~server =
897+
let registry = Rpc_registry.create ~root ~where registry in
898+
{ handler; registry; server; startup_ivar = Fiber.Ivar.create () }
899+
;;
900+
901+
let print_uncaught_rpc_error exn =
902+
Console.print [ Pp.text "Uncaught RPC Error"; Exn_with_backtrace.pp exn ]
903+
;;
904+
905+
let run t =
906+
let with_print_errors f () =
907+
Fiber.with_error_handler f ~on_error:(fun exn ->
908+
print_uncaught_rpc_error exn;
909+
Exn_with_backtrace.reraise exn)
910+
in
911+
let run () =
912+
match Exn_with_backtrace.try_with (fun () -> Lazy.force t.server) with
913+
| Error exn ->
914+
Dune_trace.emit Rpc (fun () -> Dune_trace.Event.Rpc.startup_failure exn);
915+
print_uncaught_rpc_error exn;
916+
Fiber.Ivar.fill t.startup_ivar (Error exn)
917+
| Ok server ->
918+
let* () = Fiber.Ivar.fill t.startup_ivar (Ok server) in
919+
let* sessions = Csexp_rpc.Server.serve server in
920+
let () = Rpc_registry.register t.registry in
921+
serve sessions t.handler
922+
in
923+
with_print_errors run
924+
|> Fiber.finalize ~finally:(fun () ->
925+
Rpc_registry.cleanup t.registry;
926+
Fiber.return ())
927+
;;
928+
929+
let ready t =
930+
Fiber.Ivar.read t.startup_ivar
931+
>>= function
932+
| Ok server ->
933+
let+ () = Csexp_rpc.Server.ready server in
934+
Ok ()
935+
| Error _ as error -> Fiber.return error
936+
;;
937+
938+
let stop t =
939+
Fiber.of_thunk (fun () ->
940+
(* CR-soon rgrinberg: this is weird. we shouldn't be ignoring [stop] like this *)
941+
match Fiber.Ivar.peek t.startup_ivar with
942+
| None -> Fiber.return ()
943+
| Some (Error _) -> Fiber.return ()
944+
| Some (Ok server) -> Csexp_rpc.Server.stop server)
945+
;;
946+
end

src/rpc/server.mli

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,24 @@ type t
154154
val make : 'a Handler.t -> t
155155
val serve : Csexp_rpc.Session.t Fiber.Stream.In.t -> t -> unit Fiber.t
156156

157+
module Lifecycle : sig
158+
type handler
159+
type t
160+
161+
val create
162+
: handler:handler
163+
-> root:string
164+
-> where:Where.t
165+
-> registry:[ `Add | `Skip ]
166+
-> server:Csexp_rpc.Server.t Lazy.t
167+
-> t
168+
169+
val run : t -> unit Fiber.t
170+
val ready : t -> (unit, Exn_with_backtrace.t) result Fiber.t
171+
val stop : t -> unit Fiber.t
172+
end
173+
with type handler := t
174+
157175
(** Test only things below *)
158176

159177
module type Session = Server_intf.Session

0 commit comments

Comments
 (0)