Skip to content

Commit f2dcc67

Browse files
committed
CP-307958: Various pool optimizations
Signed-off-by: Edwin Török <[email protected]>
4 parents d215b36 + 40e57b0 + 10b4f49 + fbb7117 commit f2dcc67

11 files changed

+63
-63
lines changed

ocaml/database/db_backend.ml

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ let db_FLUSH_TIMER = 2.0
2121

2222
(* --------------------- Util functions on db datastructures *)
2323

24-
let master_database = ref (Db_cache_types.Database.make Schema.empty)
24+
let master_database = Atomic.make (Db_cache_types.Database.make Schema.empty)
2525

26-
let __test_set_master_database db = master_database := db
26+
let __test_set_master_database db = Atomic.set master_database db
2727

28-
let make () = Db_ref.in_memory (ref master_database)
28+
let make () = Db_ref.in_memory master_database
2929

3030
(* !!! Right now this is called at cache population time. It would probably be preferable to call it on flush time instead, so we
3131
don't waste writes storing non-persistent field values on disk.. At the moment there's not much to worry about, since there are

ocaml/database/db_connections.ml

+3-13
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,12 @@ let preferred_write_db () = List.hd (Db_conn_store.read_db_connections ())
6262
let exit_on_next_flush = ref false
6363

6464
(* db flushing thread refcount: the last thread out of the door does the exit(0) when flush_on_exit is true *)
65-
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute
65+
let db_flush_thread_refcount = Atomic.make 0
6666

67-
let db_flush_thread_refcount_m = Mutex.create ()
68-
69-
let db_flush_thread_refcount = ref 0
70-
71-
let inc_db_flush_thread_refcount () =
72-
with_lock db_flush_thread_refcount_m (fun () ->
73-
db_flush_thread_refcount := !db_flush_thread_refcount + 1
74-
)
67+
let inc_db_flush_thread_refcount () = Atomic.incr db_flush_thread_refcount
7568

7669
let dec_and_read_db_flush_thread_refcount () =
77-
with_lock db_flush_thread_refcount_m (fun () ->
78-
db_flush_thread_refcount := !db_flush_thread_refcount - 1 ;
79-
!db_flush_thread_refcount
80-
)
70+
Atomic.fetch_and_add db_flush_thread_refcount (-1)
8171

8272
let pre_exit_hook () =
8373
(* We're about to exit. Close the active redo logs. *)

ocaml/database/db_lock.ml

+5-12
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,7 @@ module ReentrantLock : REENTRANT_LOCK = struct
5959
type t = {
6060
holder: tid option Atomic.t (* The holder of the lock *)
6161
; mutable holds: int (* How many holds the holder has on the lock *)
62-
; lock: Mutex.t (* Barrier to signal waiting threads *)
63-
; condition: Condition.t
64-
(* Waiting threads are signalled via this condition to reattempt to acquire the lock *)
62+
; lock: Mutex.t (* Mutex held by the holder thread *)
6563
; statistics: statistics (* Bookkeeping of time taken to acquire lock *)
6664
}
6765

@@ -73,7 +71,6 @@ module ReentrantLock : REENTRANT_LOCK = struct
7371
holder= Atomic.make None
7472
; holds= 0
7573
; lock= Mutex.create ()
76-
; condition= Condition.create ()
7774
; statistics= create_statistics ()
7875
}
7976

@@ -94,17 +91,15 @@ module ReentrantLock : REENTRANT_LOCK = struct
9491
let intended = Some me in
9592
let counter = Mtime_clock.counter () in
9693
Mutex.lock l.lock ;
97-
while not (Atomic.compare_and_set l.holder None intended) do
98-
Condition.wait l.condition l.lock
99-
done ;
94+
Atomic.set l.holder intended ;
10095
lock_acquired () ;
10196
let stats = l.statistics in
10297
let delta = Clock.Timer.span_to_s (Mtime_clock.count counter) in
10398
stats.total_time <- stats.total_time +. delta ;
10499
stats.min_time <- Float.min delta stats.min_time ;
105100
stats.max_time <- Float.max delta stats.max_time ;
106101
stats.acquires <- stats.acquires + 1 ;
107-
Mutex.unlock l.lock ;
102+
(* do not unlock, it will be done when holds reaches 0 instead *)
108103
l.holds <- 1
109104

110105
let unlock l =
@@ -114,10 +109,8 @@ module ReentrantLock : REENTRANT_LOCK = struct
114109
l.holds <- l.holds - 1 ;
115110
if l.holds = 0 then (
116111
let () = Atomic.set l.holder None in
117-
Mutex.lock l.lock ;
118-
Condition.signal l.condition ;
119-
Mutex.unlock l.lock ;
120-
lock_released ()
112+
(* the lock is held (acquired in [lock]), we only need to unlock *)
113+
Mutex.unlock l.lock ; lock_released ()
121114
)
122115
| _ ->
123116
failwith

ocaml/database/db_ref.ml

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,22 @@
1212
* GNU Lesser General Public License for more details.
1313
*)
1414

15-
type t = In_memory of Db_cache_types.Database.t ref ref | Remote
15+
type t = In_memory of Db_cache_types.Database.t Atomic.t | Remote
1616

1717
exception Database_not_in_memory
1818

19-
let in_memory (rf : Db_cache_types.Database.t ref ref) = In_memory rf
19+
let in_memory (rf : Db_cache_types.Database.t Atomic.t) = In_memory rf
2020

2121
let get_database = function
2222
| In_memory x ->
23-
!(!x)
23+
Atomic.get x
2424
| Remote ->
2525
raise Database_not_in_memory
2626

2727
let update_database t f =
2828
match t with
2929
| In_memory x ->
3030
let d : Db_cache_types.Database.t = f (get_database t) in
31-
!x := d
31+
Atomic.set x d
3232
| Remote ->
3333
raise Database_not_in_memory

ocaml/database/db_ref.mli

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
* GNU Lesser General Public License for more details.
1313
*)
1414

15-
type t = In_memory of Db_cache_types.Database.t ref ref | Remote
15+
type t = In_memory of Db_cache_types.Database.t Atomic.t | Remote
1616

1717
exception Database_not_in_memory
1818

19-
val in_memory : Db_cache_types.Database.t ref ref -> t
19+
val in_memory : Db_cache_types.Database.t Atomic.t -> t
2020

2121
val get_database : t -> Db_cache_types.Database.t
2222

ocaml/database/db_remote_cache_access_v1.ml

+2-5
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ module DBCacheRemoteListener = struct
66

77
exception DBCacheListenerUnknownMessageName of string
88

9-
let ctr_mutex = Mutex.create ()
10-
11-
let calls_processed = ref 0
9+
let calls_processed = Atomic.make 0
1210

1311
let success xml =
1412
let resp = XMLRPC.To.array [XMLRPC.To.string "success"; xml] in
@@ -34,8 +32,7 @@ module DBCacheRemoteListener = struct
3432
Note that, although the messages still contain the pool_secret for historical reasons,
3533
access has already been applied by the RBAC code in Xapi_http.add_handler. *)
3634
let process_xmlrpc xml =
37-
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute in
38-
with_lock ctr_mutex (fun () -> calls_processed := !calls_processed + 1) ;
35+
Atomic.incr calls_processed ;
3936
let fn_name, args =
4037
match XMLRPC.From.array (fun x -> x) xml with
4138
| [fn_name; _; args] ->

ocaml/database/redo_log.ml

+9-17
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ type redo_log_conf = {
7777
; backoff_delay: int ref
7878
; sock: Unix.file_descr option ref
7979
; pid: (Forkhelpers.pidty * string * string) option ref
80-
; dying_processes_mutex: Mutex.t
81-
; num_dying_processes: int ref
80+
; num_dying_processes: int Atomic.t
8281
; mutex: Mutex.t (** exclusive access to this configuration *)
8382
}
8483

@@ -585,14 +584,10 @@ let shutdown log =
585584
(Thread.create
586585
(fun () ->
587586
D.debug "Waiting for I/O process with pid %d to die..." ipid ;
588-
with_lock log.dying_processes_mutex (fun () ->
589-
log.num_dying_processes := !(log.num_dying_processes) + 1
590-
) ;
587+
Atomic.incr log.num_dying_processes ;
591588
ignore (Forkhelpers.waitpid p) ;
592589
D.debug "Finished waiting for process with pid %d" ipid ;
593-
with_lock log.dying_processes_mutex (fun () ->
594-
log.num_dying_processes := !(log.num_dying_processes) - 1
595-
)
590+
Atomic.decr log.num_dying_processes
596591
)
597592
()
598593
) ;
@@ -633,13 +628,11 @@ let startup log =
633628
() (* We're already started *)
634629
| None -> (
635630
(* Don't start if there are already some processes hanging around *)
636-
with_lock log.dying_processes_mutex (fun () ->
637-
if
638-
!(log.num_dying_processes)
639-
>= Db_globs.redo_log_max_dying_processes
640-
then
641-
raise TooManyProcesses
642-
) ;
631+
if
632+
Atomic.get log.num_dying_processes
633+
>= Db_globs.redo_log_max_dying_processes
634+
then
635+
raise TooManyProcesses ;
643636
match !(log.device) with
644637
| None ->
645638
D.info "Could not find block device" ;
@@ -793,8 +786,7 @@ let create ~name ~state_change_callback ~read_only =
793786
; backoff_delay= ref Db_globs.redo_log_initial_backoff_delay
794787
; sock= ref None
795788
; pid= ref None
796-
; dying_processes_mutex= Mutex.create ()
797-
; num_dying_processes= ref 0
789+
; num_dying_processes= Atomic.make 0
798790
; mutex= Mutex.create ()
799791
}
800792
in

ocaml/tests/bench/bench_pool_field.ml

+32-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ open Bechamel
1717
let () =
1818
Suite_init.harness_init () ;
1919
Printexc.record_backtrace true ;
20-
Debug.set_level Syslog.Emerg
20+
Debug.set_level Syslog.Emerg ;
21+
Xapi_event.register_hooks ()
2122

2223
let date = "20250102T03:04:05Z"
2324

@@ -36,11 +37,21 @@ let json_str =
3637

3738
let __context = Test_common.make_test_database ()
3839

40+
let host = Test_common.make_host ~__context ()
41+
42+
let pool = Test_common.make_pool ~__context ~master:host ()
43+
3944
let () =
40-
let host = Test_common.make_host ~__context () in
41-
let pool = Test_common.make_pool ~__context ~master:host () in
4245
Db.Pool.set_license_server ~__context ~self:pool
43-
~value:[("jsontest", json_str)]
46+
~value:[("jsontest", json_str)] ;
47+
let open Xapi_database in
48+
Db_ref.update_database
49+
(Context.database_of __context)
50+
(Db_cache_types.Database.register_callback "redo_log"
51+
Redo_log.database_callback
52+
)
53+
54+
let vm = Test_common.make_vm ~__context ~name_label:"test" ()
4455

4556
let get_all () : API.pool_t list =
4657
Db.Pool.get_all_records ~__context |> List.map snd
@@ -95,6 +106,20 @@ let event =
95106

96107
let test_rpc_of_event () = Event_types.rpc_of_event event
97108

109+
let counter = Atomic.make 0
110+
111+
let test_set_vm_nvram () : unit =
112+
let c = Atomic.fetch_and_add counter 1 mod 0x7F in
113+
(* use different value each iteration, otherwise it becomes a noop *)
114+
Db.VM.set_NVRAM ~__context ~self:vm
115+
~value:[("test", String.make 32768 (Char.chr @@ c))]
116+
117+
let test_db_pool_write () =
118+
let c = Atomic.fetch_and_add counter 1 mod 0x7F in
119+
Db.Pool.set_tags ~__context ~self:pool ~value:[String.make 16 (Char.chr @@ c)]
120+
121+
let test_db_pool_read () = Db.Pool.get_tags ~__context ~self:pool
122+
98123
let benchmarks =
99124
[
100125
Test.make ~name:"local_session_hook" (Staged.stage local_session_hook)
@@ -109,6 +134,9 @@ let benchmarks =
109134
; Test.make ~name:"Db_lock.with_lock uncontended"
110135
(Staged.stage db_lock_uncontended)
111136
; Test.make ~name:"rpc_of_event" (Staged.stage test_rpc_of_event)
137+
; Test.make ~name:"Db.Pool.set_tags" (Staged.stage test_db_pool_write)
138+
; Test.make ~name:"Db.Pool.get_tags" (Staged.stage test_db_pool_read)
139+
; Test.make ~name:"Db.VM.set_NVRAM" (Staged.stage test_set_vm_nvram)
112140
]
113141

114142
let () = Bechamel_simple_cli.cli benchmarks

ocaml/xapi/pool_db_backup.ml

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ let restore_from_xml __context dry_run (xml_filename : string) =
192192
(Db_xml.From.file (Datamodel_schema.of_datamodel ()) xml_filename)
193193
in
194194
version_check db ;
195-
let db_ref = Db_ref.in_memory (ref (ref db)) in
195+
let db_ref = Db_ref.in_memory (Atomic.make db) in
196196
let new_context = Context.make ~database:db_ref "restore_db" in
197197
prepare_database_for_restore ~old_context:__context ~new_context ;
198198
(* write manifest and unmarshalled db directly to db_temporary_restore_path, so its ready for us on restart *)

ocaml/xapi/xapi_session.ml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1569,5 +1569,5 @@ let create_from_db_file ~__context ~filename =
15691569
Xapi_database.Db_xml.From.file (Datamodel_schema.of_datamodel ()) filename
15701570
|> Xapi_database.Db_upgrade.generic_database_upgrade
15711571
in
1572-
let db_ref = Some (Xapi_database.Db_ref.in_memory (ref (ref db))) in
1572+
let db_ref = Some (Xapi_database.Db_ref.in_memory (Atomic.make db)) in
15731573
create_readonly_session ~__context ~uname:"db-from-file" ~db_ref

ocaml/xapi/xapi_vdi_helpers.ml

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ let database_ref_of_vdi ~__context ~vdi =
184184
debug "Enabling redo_log with device reason [%s]" device ;
185185
Redo_log.enable_block_existing log device ;
186186
let db = Database.make (Datamodel_schema.of_datamodel ()) in
187-
let db_ref = Xapi_database.Db_ref.in_memory (ref (ref db)) in
187+
let db_ref = Xapi_database.Db_ref.in_memory (Atomic.make db) in
188188
Redo_log_usage.read_from_redo_log log Xapi_globs.foreign_metadata_db db_ref ;
189189
Redo_log.delete log ;
190190
(* Upgrade database to the local schema. *)

0 commit comments

Comments
 (0)