Skip to content

Commit 83f4517

Browse files
authored
CP-52821: use Mtime in Xapi_periodic_scheduler (#6161)
Target: feature/perf Note that this conflicts with 7e9310f?diff=unified&w=1, but is a pre-requisite of #6126
2 parents 0b34302 + 6b6c6c5 commit 83f4517

File tree

7 files changed

+67
-56
lines changed

7 files changed

+67
-56
lines changed

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
(public_name xapi-stdext-threads.scheduler)
1919
(name xapi_stdext_threads_scheduler)
2020
(modules ipq scheduler)
21-
(libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads)
21+
(libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads clock)
2222
)
2323

2424
(tests

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml

+7-7
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
*)
1414
(* Imperative priority queue *)
1515

16-
type 'a event = {ev: 'a; time: Mtime.t}
16+
type 'a event = {ev: 'a; time: Mtime.span}
1717

1818
type 'a t = {default: 'a event; mutable size: int; mutable data: 'a event array}
1919

@@ -23,7 +23,7 @@ let create n default =
2323
if n <= 0 then
2424
invalid_arg "create"
2525
else
26-
let default = {ev= default; time= Mtime_clock.now ()} in
26+
let default = {ev= default; time= Mtime_clock.elapsed ()} in
2727
{default; size= 0; data= Array.make n default}
2828

2929
let is_empty h = h.size <= 0
@@ -45,7 +45,7 @@ let add h x =
4545
(* moving [x] up in the heap *)
4646
let rec moveup i =
4747
let fi = (i - 1) / 2 in
48-
if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then (
48+
if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then (
4949
d.(i) <- d.(fi) ;
5050
moveup fi
5151
) else
@@ -69,7 +69,7 @@ let remove h s =
6969
(* moving [x] up in the heap *)
7070
let rec moveup i =
7171
let fi = (i - 1) / 2 in
72-
if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then (
72+
if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then (
7373
d.(i) <- d.(fi) ;
7474
moveup fi
7575
) else
@@ -83,7 +83,7 @@ let remove h s =
8383
let j' = j + 1 in
8484
if j' < n && d.(j').time < d.(j).time then j' else j
8585
in
86-
if Mtime.is_earlier d.(j).time ~than:x.time then (
86+
if Mtime.Span.is_shorter d.(j).time ~than:x.time then (
8787
d.(i) <- d.(j) ;
8888
movedown j
8989
) else
@@ -93,7 +93,7 @@ let remove h s =
9393
in
9494
if s = n then
9595
()
96-
else if Mtime.is_later d.(s).time ~than:x.time then
96+
else if Mtime.Span.is_longer d.(s).time ~than:x.time then
9797
moveup s
9898
else
9999
movedown s ;
@@ -129,7 +129,7 @@ let check h =
129129
let d = h.data in
130130
for i = 1 to h.size - 1 do
131131
let fi = (i - 1) / 2 in
132-
let ordered = Mtime.is_later d.(i).time ~than:d.(fi).time in
132+
let ordered = Mtime.Span.is_longer d.(i).time ~than:d.(fi).time in
133133
assert ordered
134134
done
135135

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* GNU Lesser General Public License for more details.
1313
*)
1414

15-
type 'a event = {ev: 'a; time: Mtime.t}
15+
type 'a event = {ev: 'a; time: Mtime.span}
1616

1717
type 'a t
1818

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml

+7-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ module Ipq = Xapi_stdext_threads_scheduler.Ipq
1717
(* test we get "out of bound" exception calling Ipq.remove *)
1818
let test_out_of_index () =
1919
let q = Ipq.create 10 0 in
20-
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ;
20+
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ;
2121
let is_oob = function
2222
| Invalid_argument s when String.ends_with ~suffix:" out of bounds" s ->
2323
true
@@ -43,18 +43,18 @@ let test_leak () =
4343
let use_array () = array.(0) <- 'a' in
4444
let allocated = Atomic.make true in
4545
Gc.finalise (fun _ -> Atomic.set allocated false) array ;
46-
Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.now ()} ;
46+
Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.elapsed ()} ;
4747
Ipq.remove q 0 ;
4848
Gc.full_major () ;
4949
Gc.full_major () ;
5050
Alcotest.(check bool) "allocated" false (Atomic.get allocated) ;
51-
Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.now ()}
51+
Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.elapsed ()}
5252

5353
(* test Ipq.is_empty call *)
5454
let test_empty () =
5555
let q = Ipq.create 10 0 in
5656
Alcotest.(check bool) "same value" true (Ipq.is_empty q) ;
57-
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ;
57+
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ;
5858
Alcotest.(check bool) "same value" false (Ipq.is_empty q) ;
5959
Ipq.remove q 0 ;
6060
Alcotest.(check bool) "same value" true (Ipq.is_empty q)
@@ -75,7 +75,7 @@ let set queue =
7575
Ipq.iter
7676
(fun d ->
7777
let t = d.time in
78-
let t = Mtime.to_uint64_ns t in
78+
let t = Mtime.Span.to_uint64_ns t in
7979
s := Int64Set.add t !s
8080
)
8181
queue ;
@@ -86,7 +86,7 @@ let test_old () =
8686
let s = ref Int64Set.empty in
8787
let add i =
8888
let ti = Random.int64 1000000L in
89-
let t = Mtime.of_uint64_ns ti in
89+
let t = Mtime.Span.of_uint64_ns ti in
9090
let e = {Ipq.time= t; Ipq.ev= i} in
9191
Ipq.add test e ;
9292
s := Int64Set.add ti !s
@@ -123,7 +123,7 @@ let test_old () =
123123
let prev = ref 0L in
124124
for _ = 0 to 49 do
125125
let e = Ipq.pop_maximum test in
126-
let t = Mtime.to_uint64_ns e.time in
126+
let t = Mtime.Span.to_uint64_ns e.time in
127127
Alcotest.(check bool)
128128
(Printf.sprintf "%Ld bigger than %Ld" t !prev)
129129
true (t >= !prev) ;

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml

+23-26
Original file line numberDiff line numberDiff line change
@@ -33,31 +33,23 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default
3333

3434
let lock = Mutex.create ()
3535

36-
module Clock = struct
37-
let span s = Mtime.Span.of_uint64_ns (Int64.of_float (s *. 1e9))
38-
39-
let span_to_s span =
40-
Mtime.Span.to_uint64_ns span |> Int64.to_float |> fun ns -> ns /. 1e9
41-
42-
let add_span clock secs =
43-
(* return mix or max available value if the add overflows *)
44-
match Mtime.add_span clock (span secs) with
45-
| Some t ->
46-
t
47-
| None when secs > 0. ->
48-
Mtime.max_stamp
49-
| None ->
50-
Mtime.min_stamp
51-
end
52-
53-
let add_to_queue name ty start newfunc =
54-
let ( ++ ) = Clock.add_span in
36+
let add_to_queue_span name ty start_span newfunc =
37+
let ( ++ ) = Mtime.Span.add in
5538
let item =
56-
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
39+
{
40+
Ipq.ev= {func= newfunc; ty; name}
41+
; Ipq.time= Mtime_clock.elapsed () ++ start_span
42+
}
5743
in
5844
with_lock lock (fun () -> Ipq.add queue item) ;
5945
Delay.signal delay
6046

47+
let add_to_queue name ty start newfunc =
48+
let start_span =
49+
Clock.Timer.s_to_span start |> Option.value ~default:Mtime.Span.max_span
50+
in
51+
add_to_queue_span name ty start_span newfunc
52+
6153
let remove_from_queue name =
6254
with_lock lock @@ fun () ->
6355
match !pending_event with
@@ -72,8 +64,11 @@ let add_periodic_pending () =
7264
with_lock lock @@ fun () ->
7365
match !pending_event with
7466
| Some ({ty= Periodic timer; _} as ev) ->
75-
let ( ++ ) = Clock.add_span in
76-
let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in
67+
let ( ++ ) = Mtime.Span.add in
68+
let delta =
69+
Clock.Timer.s_to_span timer |> Option.value ~default:Mtime.Span.max_span
70+
in
71+
let item = {Ipq.ev; Ipq.time= Mtime_clock.elapsed () ++ delta} in
7772
Ipq.add queue item ;
7873
pending_event := None
7974
| Some {ty= OneShot; _} ->
@@ -85,15 +80,15 @@ let loop () =
8580
debug "%s started" __MODULE__ ;
8681
try
8782
while true do
88-
let now = Mtime_clock.now () in
83+
let now = Mtime_clock.elapsed () in
8984
let deadline, item =
9085
with_lock lock @@ fun () ->
9186
(* empty: wait till we get something *)
9287
if Ipq.is_empty queue then
93-
(Clock.add_span now 10.0, None)
88+
(Mtime.Span.add now Mtime.Span.(10 * s), None)
9489
else
9590
let next = Ipq.maximum queue in
96-
if Mtime.is_later next.Ipq.time ~than:now then
91+
if Mtime.Span.is_longer next.Ipq.time ~than:now then
9792
(* not expired: wait till time or interrupted *)
9893
(next.Ipq.time, None)
9994
else (
@@ -111,7 +106,9 @@ let loop () =
111106
| None -> (
112107
(* Sleep until next event. *)
113108
let sleep =
114-
Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s
109+
Mtime.(Span.abs_diff deadline now)
110+
|> Mtime.Span.(add ms)
111+
|> Clock.Timer.span_to_s
115112
in
116113
try ignore (Delay.wait delay sleep)
117114
with e ->

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ type func_ty =
1818
| OneShot (** Fire just once *)
1919
| Periodic of float (** Fire periodically with a given period in seconds *)
2020

21+
val add_to_queue_span :
22+
string -> func_ty -> Mtime.span -> (unit -> unit) -> unit
23+
(** Start a new timer. *)
24+
2125
val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
2226
(** Start a new timer. *)
2327

ocaml/xapi/xapi_event.ml

+24-14
Original file line numberDiff line numberDiff line change
@@ -419,20 +419,25 @@ module From = struct
419419

420420
let session_is_invalid call = with_lock call.m (fun () -> call.session_invalid)
421421

422-
let wait2 call from_id deadline =
422+
let wait2 call from_id timer =
423423
let timeoutname = Printf.sprintf "event_from_timeout_%Ld" call.index in
424424
with_lock m (fun () ->
425425
while
426426
from_id = call.cur_id
427427
&& (not (session_is_invalid call))
428-
&& Unix.gettimeofday () < deadline
428+
&& not (Clock.Timer.has_expired timer)
429429
do
430-
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue timeoutname
431-
Xapi_stdext_threads_scheduler.Scheduler.OneShot
432-
(deadline -. Unix.gettimeofday () +. 0.5)
433-
(fun () -> Condition.broadcast c) ;
434-
Condition.wait c m ;
435-
Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue timeoutname
430+
match Clock.Timer.remaining timer with
431+
| Expired _ ->
432+
()
433+
| Remaining delta ->
434+
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue_span
435+
timeoutname Xapi_stdext_threads_scheduler.Scheduler.OneShot
436+
delta (fun () -> Condition.broadcast c
437+
) ;
438+
Condition.wait c m ;
439+
Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue
440+
timeoutname
436441
done
437442
) ;
438443
if session_is_invalid call then (
@@ -506,7 +511,7 @@ let rec next ~__context =
506511
else
507512
rpc_of_events relevant
508513

509-
let from_inner __context session subs from from_t deadline =
514+
let from_inner __context session subs from from_t timer =
510515
let open Xapi_database in
511516
let open From in
512517
(* The database tables involved in our subscription *)
@@ -605,14 +610,14 @@ let from_inner __context session subs from from_t deadline =
605610
&& mods = []
606611
&& deletes = []
607612
&& messages = []
608-
&& Unix.gettimeofday () < deadline
613+
&& not (Clock.Timer.has_expired timer)
609614
then (
610615
last_generation := last ;
611616
(* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
612617
sub.cur_id <- last ;
613618
(* last id the client got is equivalent to the current one *)
614619
last_msg_gen := msg_gen ;
615-
wait2 sub last deadline ;
620+
wait2 sub last timer ;
616621
Thread.delay 0.05 ;
617622
grab_nonempty_range ()
618623
) else
@@ -705,14 +710,19 @@ let from ~__context ~classes ~token ~timeout =
705710
)
706711
in
707712
let subs = List.map Subscription.of_string classes in
708-
let deadline = Unix.gettimeofday () +. timeout in
713+
let duration =
714+
timeout
715+
|> Clock.Timer.s_to_span
716+
|> Option.value ~default:Mtime.Span.(24 * hour)
717+
in
718+
let timer = Clock.Timer.start ~duration in
709719
(* We need to iterate because it's possible for an empty event set
710720
to be generated if we peek in-between a Modify and a Delete; we'll
711721
miss the Delete event and fail to generate the Modify because the
712722
snapshot can't be taken. *)
713723
let rec loop () =
714-
let event_from = from_inner __context session subs from from_t deadline in
715-
if event_from.events = [] && Unix.gettimeofday () < deadline then (
724+
let event_from = from_inner __context session subs from from_t timer in
725+
if event_from.events = [] && not (Clock.Timer.has_expired timer) then (
716726
debug "suppressing empty event.from" ;
717727
loop ()
718728
) else

0 commit comments

Comments
 (0)