Skip to content

Commit e2a2bdf

Browse files
committed
fix(watch): complete RPC builds independently
Introduce build requests with hidden completion ivars in the build system. The watch build loop can still batch RPC build requests together, but each request now reports success as soon as its own action builder and per-request finalization complete instead of waiting for the entire batch. Complete any unfinished RPC requests with the batch outcome when a build iteration ends, so requests cancelled by --stop-on-first-error do not hang. Update the concurrent RPC build cram test to expect the fast request to finish before the slow target is released. Signed-off-by: Rudi Grinberg <me@rgrinberg.com> refactor(watch): narrow RPC request completion hook Signed-off-by: Rudi Grinberg <me@rgrinberg.com> refactor(watch): use input generation for RPC completion Signed-off-by: Rudi Grinberg <me@rgrinberg.com> fix(watch): defer diff-promotion output and finish to batch end Per-request finalization now only saves the diff-promotion database without promoting files, so promotion output stays ordered after diff and error messages. When per-request completion triggers a restart signal but all RPC requests have already finished, treat the iteration as Done so that the watch status line is still emitted. Signed-off-by: Rudi Grinberg <me@rgrinberg.com> refactor(watch): split RPC request checkpointing from completion Checkpoint request side effects before completing RPC requests, but keep build-wide diff-promotion finalization at the end of the build iteration. Defer request completion under --auto-promote after a checkpoint saves diff promotions, so watch mode can still restart after the final auto-promotion. This restores ordinary auto-promote output ordering while keeping saved promotions visible to RPC promotion requests. Signed-off-by: Rudi Grinberg <me@rgrinberg.com>
1 parent e3fa370 commit e2a2bdf

8 files changed

Lines changed: 299 additions & 71 deletions

File tree

bin/build.ml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ let action_builder_of_request request =
66
;;
77

88
let run_build_system ~run_id ~request =
9-
Dune_engine.Build_system.run_action_builder ~run_id (action_builder_of_request request)
9+
let request =
10+
action_builder_of_request request |> Dune_engine.Build_system.Build_request.create
11+
in
12+
Dune_engine.Build_system.run_build_requests ~run_id [ request ]
1013
;;
1114

1215
let rpc_request_action ~(common : Common.t) (kind : Dune_rpc_impl.Server.build_request) =

src/dune_engine/build_loop.ml

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,6 @@ module Rpc_request_id = struct
4343
module Map = C.Map
4444
end
4545

46-
type rpc_request =
47-
{ build : unit Action_builder.t
48-
; outcome : Build_outcome.t Fiber.Ivar.t
49-
}
50-
5146
type t =
5247
{ mutable status : status
5348
; mutable pending_reset : Memo.Invalidation.t option
@@ -58,7 +53,7 @@ type t =
5853
(* Incremented whenever an invalidation is recorded. This lets waiters
5954
distinguish input changes from other wakeups. *)
6055
; mutable next_run_id : int
61-
; mutable rpc_requests : rpc_request Rpc_request_id.Map.t
56+
; mutable rpc_requests : Build_system.Build_request.t Rpc_request_id.Map.t
6257
; mutable state : [ `Awaiting_init | `Init ]
6358
}
6459

@@ -221,14 +216,20 @@ let reset_wakeup t =
221216
Trigger.trigger previous_wakeup
222217
;;
223218

219+
let all_rpc_requests_finished t =
220+
Rpc_request_id.Map.to_list t.rpc_requests
221+
|> List.for_all ~f:(fun (_, request) -> Build_system.Build_request.is_finished request)
222+
;;
223+
224224
let run_current_build
225225
({ watch_restart_started_at = restart_started_at
226226
; input_change_generation = build_start_input_change_generation
227227
; wakeup_generation = build_start_wakeup_generation
228228
; _
229229
} as t)
230230
~run_id
231-
build
231+
~finish_when_rpc_requests_finished
232+
run_build
232233
=
233234
let* () = reset_wakeup t in
234235
(match t.status with
@@ -237,10 +238,15 @@ let run_current_build
237238
t.status <- Building run_id;
238239
let+ outcome =
239240
Scheduler.with_current_build_cancellation (Fiber.Cancel.create ()) (fun () ->
240-
Build_system.run_action_builder ?restart_started_at ~run_id build)
241+
run_build ?restart_started_at ~run_id ())
241242
in
242243
let next =
243244
match t.status with
245+
| Restarting_build _
246+
when finish_when_rpc_requests_finished && all_rpc_requests_finished t ->
247+
t.status <- Standing_by;
248+
t.watch_restart_started_at <- None;
249+
`Done
244250
| Restarting_build _ -> `Restart
245251
| Standing_by | Building _ ->
246252
t.status <- Standing_by;
@@ -260,6 +266,13 @@ let rec wait_for_wakeup_after t wakeup_generation =
260266
wait_for_wakeup_after t wakeup_generation)
261267
;;
262268

269+
let remove_finished_rpc_requests t =
270+
Rpc_request_id.Map.to_list t.rpc_requests
271+
|> List.iter ~f:(fun (id, request) ->
272+
if Build_system.Build_request.is_finished request
273+
then t.rpc_requests <- Rpc_request_id.Map.remove t.rpc_requests id)
274+
;;
275+
263276
let cancel_rpc_requests t ~f =
264277
let* () = Fiber.return () in
265278
match
@@ -270,11 +283,17 @@ let cancel_rpc_requests t ~f =
270283
| to_cancel ->
271284
List.iter to_cancel ~f:(fun (id, _) ->
272285
t.rpc_requests <- Rpc_request_id.Map.remove t.rpc_requests id);
286+
let to_cancel_unfinished =
287+
List.filter to_cancel ~f:(fun (_, request) ->
288+
not (Build_system.Build_request.is_finished request))
289+
in
273290
let* () =
274-
Fiber.parallel_iter to_cancel ~f:(fun (_, { outcome; _ }) ->
275-
Fiber.Ivar.fill outcome Failure)
291+
Fiber.parallel_iter to_cancel_unfinished ~f:(fun (_, request) ->
292+
Build_system.Build_request.complete request Failure)
276293
in
277-
request_rebuild_due_to_rpc_request t
294+
if List.is_empty to_cancel_unfinished
295+
then Fiber.return ()
296+
else request_rebuild_due_to_rpc_request t
278297
;;
279298

280299
let cancel_rpc_requests_by_session t ~session_id =
@@ -285,18 +304,19 @@ let cancel_rpc_requests_by_session t ~session_id =
285304
let cancel_all_rpc_requests t = cancel_rpc_requests t ~f:(fun _ _ -> true)
286305

287306
let submit_rpc_request t ~session_id ~request_id ~build =
288-
let outcome = Fiber.Ivar.create () in
307+
let request = Build_system.Build_request.create build in
289308
let id = Rpc_request_id.create ~session_id ~request_id in
290309
let* () = Fiber.return () in
291310
(match Rpc_request_id.Map.find t.rpc_requests id with
292311
| Some _ ->
293312
Code_error.raise
294313
"RPC build request with this id is already active"
295314
[ "id", Rpc_request_id.to_dyn id ]
296-
| None ->
297-
t.rpc_requests <- Rpc_request_id.Map.add_exn t.rpc_requests id { build; outcome });
315+
| None -> t.rpc_requests <- Rpc_request_id.Map.add_exn t.rpc_requests id request);
298316
let* () = request_rebuild_due_to_rpc_request t in
299-
Fiber.Ivar.read outcome
317+
let+ outcome = Build_system.Build_request.await request in
318+
t.rpc_requests <- Rpc_request_id.Map.remove t.rpc_requests id;
319+
outcome
300320
;;
301321

302322
type rpc_poll_iter_result =
@@ -306,6 +326,7 @@ type rpc_poll_iter_result =
306326

307327
let rpc_poll_iter t ~sticky_goal ~sticky_built_at =
308328
let rec loop () =
329+
remove_finished_rpc_requests t;
309330
let rpc_requests = Rpc_request_id.Map.to_list t.rpc_requests in
310331
let sticky_goal_to_build =
311332
match sticky_goal with
@@ -329,11 +350,12 @@ let rpc_poll_iter t ~sticky_goal ~sticky_built_at =
329350
Fiber.return { wakeup_generation = t.wakeup_generation; sticky_built_at = None }
330351
| _ ->
331352
let* res, next, input_change_generation, wakeup_generation =
332-
let build =
333-
let builds = List.map rpc_requests ~f:(fun (_, { build; _ }) -> build) in
353+
let requests =
354+
let rpc_requests = List.map rpc_requests ~f:snd in
334355
match sticky_goal_to_build with
335-
| None -> Action_builder.all_unit builds
336-
| Some sticky_goal -> Action_builder.all_unit (sticky_goal :: builds)
356+
| None -> rpc_requests
357+
| Some sticky_goal ->
358+
Build_system.Build_request.create sticky_goal :: rpc_requests
337359
in
338360
let run_id = next_watch_run_id t in
339361
let () =
@@ -347,7 +369,19 @@ let rpc_poll_iter t ~sticky_goal ~sticky_built_at =
347369
Console.maybe_clear_screen ~details_hum);
348370
Memo.reset invalidation
349371
in
350-
run_current_build t ~run_id build
372+
let build_input_change_generation = t.input_change_generation in
373+
let finish_when_rpc_requests_finished = Option.is_none sticky_goal_to_build in
374+
run_current_build
375+
t
376+
~run_id
377+
~finish_when_rpc_requests_finished
378+
(fun ?restart_started_at ~run_id () ->
379+
Build_system.run_build_requests
380+
?restart_started_at
381+
~run_id
382+
~should_complete_request:(fun () ->
383+
Int.equal t.input_change_generation build_input_change_generation)
384+
requests)
351385
in
352386
(match next with
353387
| `Restart -> loop ()
@@ -358,15 +392,13 @@ let rpc_poll_iter t ~sticky_goal ~sticky_built_at =
358392
| Ok () -> Build_outcome.Success
359393
| Error `Already_reported -> Failure
360394
in
361-
let+ () =
362-
Fiber.sequential_iter rpc_requests ~f:(fun (id, _) ->
363-
match Rpc_request_id.Map.find t.rpc_requests id with
364-
| None -> Fiber.return ()
365-
| Some { outcome = ivar; _ } ->
366-
t.rpc_requests <- Rpc_request_id.Map.remove t.rpc_requests id;
367-
Fiber.Ivar.fill ivar outcome)
395+
let* () =
396+
Fiber.sequential_iter rpc_requests ~f:(fun (id, request) ->
397+
t.rpc_requests <- Rpc_request_id.Map.remove t.rpc_requests id;
398+
Build_system.Build_request.complete request outcome)
368399
in
369-
build_finish outcome
400+
build_finish outcome;
401+
Fiber.return ()
370402
in
371403
{ wakeup_generation
372404
; sticky_built_at =

src/dune_engine/build_system.ml

Lines changed: 98 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,10 +1121,19 @@ let handle_final_exns exns =
11211121
List.iter exns ~f:Dune_util.Report_error.report)
11221122
;;
11231123

1124-
let run ?restart_started_at ?(run_id = Run_id.Batch) f =
1125-
let finalize_diff_promotion () =
1126-
protect ~f:Diff_promotion.finalize ~finally:Diff_promotion.clear_cache
1127-
in
1124+
let finalize_diff_promotion () =
1125+
protect ~f:Diff_promotion.finalize ~finally:Diff_promotion.clear_cache
1126+
;;
1127+
1128+
let checkpoint_build_request () =
1129+
let open Fiber.O in
1130+
Target_promotion.save ();
1131+
let diff_promotions_saved = Diff_promotion.save () in
1132+
let+ () = Scheduler.flush_file_watcher () in
1133+
diff_promotions_saved
1134+
;;
1135+
1136+
let run_with_error_collection ?restart_started_at ?(run_id = Run_id.Batch) collect_errors =
11281137
let open Fiber.O in
11291138
let f () =
11301139
let start = Time.now () in
@@ -1144,10 +1153,7 @@ let run ?restart_started_at ?(run_id = Run_id.Batch) f =
11441153
Fs_memo.invalidate_cached_timestamps ();
11451154
let* () = State.reset_progress () in
11461155
let* () = State.reset_errors () in
1147-
let* outcome =
1148-
Fiber.collect_errors (fun () ->
1149-
Memo.run_with_error_handler f ~handle_error_no_raise:report_early_exn)
1150-
in
1156+
let* outcome = collect_errors () in
11511157
Dtemp.clear ();
11521158
Sandbox.cleanup_pending_targets ();
11531159
Target_promotion.save ();
@@ -1193,6 +1199,12 @@ let run ?restart_started_at ?(run_id = Run_id.Batch) f =
11931199
Scheduler.with_current_build_cancellation (Fiber.Cancel.create ()) f)
11941200
;;
11951201

1202+
let run ?restart_started_at ?run_id f =
1203+
run_with_error_collection ?restart_started_at ?run_id (fun () ->
1204+
Fiber.collect_errors (fun () ->
1205+
Memo.run_with_error_handler f ~handle_error_no_raise:report_early_exn))
1206+
;;
1207+
11961208
let run_exn f =
11971209
let open Fiber.O in
11981210
run f
@@ -1201,12 +1213,84 @@ let run_exn f =
12011213
| Error `Already_reported -> raise Dune_util.Report_error.Already_reported
12021214
;;
12031215

1204-
let run_action_builder ?restart_started_at ?run_id request =
1205-
run ?restart_started_at ?run_id (fun () ->
1206-
let+ (), (_ : Dep.Fact.t Dep.Map.t) =
1207-
Action_builder.evaluate_and_collect_facts request
1208-
in
1209-
())
1216+
let evaluate_action_builder request =
1217+
let+ (), (_ : Dep.Fact.t Dep.Map.t) =
1218+
Action_builder.evaluate_and_collect_facts request
1219+
in
1220+
()
1221+
;;
1222+
1223+
module Build_request = struct
1224+
type t =
1225+
{ build : unit Action_builder.t
1226+
; outcome : Build_outcome.t Fiber.Ivar.t
1227+
}
1228+
1229+
let create build = { build; outcome = Fiber.Ivar.create () }
1230+
let await t = Fiber.Ivar.read t.outcome
1231+
let is_finished t = Option.is_some (Fiber.Ivar.peek t.outcome)
1232+
1233+
let complete t outcome =
1234+
Fiber.of_thunk (fun () ->
1235+
match Fiber.Ivar.peek t.outcome with
1236+
| Some _ -> Fiber.return ()
1237+
| None -> Fiber.Ivar.fill t.outcome outcome)
1238+
;;
1239+
1240+
let build t = t.build
1241+
end
1242+
1243+
let run_build_requests
1244+
?restart_started_at
1245+
?run_id
1246+
?(should_complete_request = fun () -> true)
1247+
requests
1248+
=
1249+
let open Fiber.O in
1250+
let run_action_builder_fiber request =
1251+
Fiber.collect_errors (fun () ->
1252+
Memo.run_with_error_handler
1253+
(fun () -> evaluate_action_builder request)
1254+
~handle_error_no_raise:report_early_exn)
1255+
in
1256+
let diff_promotions_saved = ref false in
1257+
let checkpoint_request () =
1258+
let+ saved = checkpoint_build_request () in
1259+
if saved then diff_promotions_saved := true
1260+
in
1261+
let should_defer_completion () =
1262+
match !Clflags.promote with
1263+
| Some Automatically -> !diff_promotions_saved
1264+
| Some Never | None -> false
1265+
in
1266+
let finish_request request outcome =
1267+
let* () = checkpoint_request () in
1268+
if should_complete_request () && not (should_defer_completion ())
1269+
then Build_request.complete request outcome
1270+
else Fiber.return ()
1271+
in
1272+
let run_request request =
1273+
Build_request.build request
1274+
|> run_action_builder_fiber
1275+
>>= function
1276+
| Ok () ->
1277+
let+ () = finish_request request Build_outcome.Success in
1278+
Ok ()
1279+
| Error exns when List.for_all exns ~f:caused_by_cancellation ->
1280+
Fiber.return (Error exns)
1281+
| Error exns ->
1282+
let+ () = finish_request request Build_outcome.Failure in
1283+
Error exns
1284+
in
1285+
run_with_error_collection ?restart_started_at ?run_id (fun () ->
1286+
let+ results = Fiber.parallel_map requests ~f:run_request in
1287+
match
1288+
List.concat_map results ~f:(function
1289+
| Ok () -> []
1290+
| Error exns -> exns)
1291+
with
1292+
| [] -> Ok ()
1293+
| exns -> Error exns)
12101294
;;
12111295
12121296
let build_file p =

src/dune_engine/build_system.mli

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,20 @@ val run
6060
(** A variant of [run] that raises an [Already_reported] exception on error. *)
6161
val run_exn : (unit -> 'a Memo.t) -> 'a Fiber.t
6262

63-
val run_action_builder
63+
module Build_request : sig
64+
type t
65+
66+
val create : unit Action_builder.t -> t
67+
val await : t -> Build_outcome.t Fiber.t
68+
val is_finished : t -> bool
69+
val complete : t -> Build_outcome.t -> unit Fiber.t
70+
end
71+
72+
val run_build_requests
6473
: ?restart_started_at:Time.t
6574
-> ?run_id:Run_id.t
66-
-> unit Action_builder.t
75+
-> ?should_complete_request:(unit -> bool)
76+
-> Build_request.t list
6777
-> (unit, [ `Already_reported ]) result Fiber.t
6878

6979
(** {2 Misc} *)

0 commit comments

Comments
 (0)