-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathpgx_async_example.ml
141 lines (130 loc) · 3.83 KB
/
pgx_async_example.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
(* A basic example of Pgx_async usage *)
open Core_kernel
open Async_kernel
open Async_unix
module Employee = struct
let create db =
Pgx_async.simple_query
db
{|
CREATE TEMPORARY TABLE Employee (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE);
|}
|> Deferred.ignore_m
;;
(* This function lets us insert multiple users relatively efficiently *)
let insert_many db names =
let params = List.map names ~f:(fun name -> Pgx_async.Value.[ of_string name ]) in
Pgx_async.execute_many
db
~params
~query:
{|
INSERT INTO Employee (name)
VALUES ($1)
RETURNING id
|}
>>| List.map ~f:(function
| [ [ id ] ] -> Pgx.Value.to_int_exn id
| _ -> assert false)
;;
let insert ~name db = insert_many db [ name ] >>| List.hd_exn
end
module Facility = struct
let create db =
Pgx_async.simple_query
db
{|
CREATE TEMPORARY TABLE Facility (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
director_id INT REFERENCES Employee(id) ON DELETE SET NULL);
CREATE INDEX facility_director_id ON Facility (director_id);
|}
|> Deferred.ignore_m
;;
let insert ~name ?director_id db =
let params = Pgx_async.Value.[ of_string name; opt of_int director_id ] in
Pgx_async.execute
db
~params
{|
INSERT INTO Facility (name, director_id)
VALUES ($1, $2)
RETURNING id
|}
>>| function
| [ [ id ] ] -> Pgx.Value.to_int_exn id
| _ -> assert false
;;
let all_name_and_director_name db =
Pgx_async.execute
db
{|
SELECT f.name, e.name
FROM Facility f
LEFT JOIN Employee e ON e.id = f.director_id
|}
>>| List.map ~f:(function
| [ name; director_name ] ->
Pgx.Value.(to_string_exn name, to_string director_name)
| _ -> assert false)
;;
let reassign_director db ~director_id ~from_facility_id ~to_facility_id =
(* Note: with_transaction doesn't currently have any special handling
for concurrent queries *)
Pgx_async.with_transaction db
@@ fun db ->
let params = Pgx.Value.[ of_int director_id; of_int from_facility_id ] in
Pgx_async.execute
db
~params
{|
UPDATE Facility SET director_id = NULL WHERE id = $2 AND director_id = $1
|}
>>= fun _ ->
let params = Pgx.Value.[ of_int director_id; of_int to_facility_id ] in
Pgx_async.execute
db
~params
{|
UPDATE Facility SET director_id = $1 WHERE id = $2
|}
|> Deferred.ignore_m
;;
end
let setup db = Employee.create db >>= fun () -> Facility.create db
let main () =
Pgx_async.with_conn
@@ fun db ->
setup db
>>= fun () ->
Employee.insert ~name:"Steve" db
>>= fun steve_id ->
(* Parallel queries are not an error, but will execute in serial *)
[ Facility.insert ~name:"Headquarters" ~director_id:steve_id db
; Facility.insert ~name:"New Office" db
]
|> Deferred.all
>>= function
| [ headquarters_id; new_office_id ] ->
Facility.all_name_and_director_name db
>>| List.iter ~f:(fun (name, director_name) ->
let director_name = Option.value director_name ~default:"(none)" in
printf "The director of %s is %s\n" name director_name)
>>= fun () ->
print_endline "Re-assigning Steve to the New Office";
Facility.reassign_director
db
~director_id:steve_id
~from_facility_id:headquarters_id
~to_facility_id:new_office_id
>>= fun () ->
Facility.all_name_and_director_name db
>>| List.iter ~f:(fun (name, director_name) ->
let director_name = Option.value director_name ~default:"(none)" in
printf "The director of %s is %s\n" name director_name)
| _ -> assert false
;;
let () = Thread_safe.block_on_async_exn main