-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcores.ml
216 lines (197 loc) · 6.43 KB
/
cores.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
(**************************************************************************)
(* *)
(* Functory: a distributed computing library for Ocaml *)
(* Copyright (C) 2010 Jean-Christophe Filliatre and Kalyan Krishnamani *)
(* *)
(* This software is free software; you can redistribute it and/or *)
(* modify it under the terms of the GNU Library General Public *)
(* License version 2.1, with the special exception on linking *)
(* described in file LICENSE. *)
(* *)
(* This software is distributed in the hope that it will be useful, *)
(* but WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *)
(* *)
(**************************************************************************)
open Format
open Control
open Unix
(* main loop: assigns tasks to workers, until no more task *)
let run
~(create_job : 'worker -> 'task -> unit)
~(wait : unit -> 'worker * 'task list)
(workers : 'worker list)
(tasks : 'task list)
=
let todo = Stack.create () in
List.iter (fun t -> Stack.push t todo) tasks;
let towait = ref 0 in
let idle = Stack.create () in
List.iter (fun w -> Stack.push w idle) workers;
while not (Stack.is_empty todo) || !towait > 0 do
(* if possible, start new workers *)
while not (Stack.is_empty idle) && not (Stack.is_empty todo) do
let t = Stack.pop todo in
let w = Stack.pop idle in
create_job w t;
incr towait
done;
assert (!towait > 0);
(* otherwise, wait for results *)
let w, tl = wait () in
decr towait;
Stack.push w idle;
List.iter (fun t -> Stack.push t todo) tl
done;
assert (Stack.is_empty todo && !towait = 0)
let ncores = ref 1
let set_number_of_cores n = ncores := n
let rec listij acc i j = if i > j then acc else listij (j :: acc) i (j-1)
let workers () = listij [] 1 !ncores
(*** using local files ***************************************************)
type 'a job = {
worker : int;
pid : int;
file : string;
task : 'a;
}
let create_worker w (f : 'a -> 'b) (t : 'a * 'c) : ('a * 'c) job =
let file = Filename.temp_file "mapfold" "output" in
match fork () with
| 0 -> (* child *)
let r = f (fst t) in
let c = open_out file in
output_value c r;
close_out c;
exit 0
| pid -> (* parent *)
{ worker = w;
pid = pid;
file = file;
task = t }
let compute
~(worker : 'a -> 'b) ~(master : ('a * 'c) -> 'b -> ('a * 'c) list) tasks =
let jobs = Hashtbl.create 17 in (* PID -> job *)
let rec wait () =
match Unix.wait () with
| p, WEXITED e ->
dprintf "master: got result from worker PID %d@." p;
begin try
let j = Hashtbl.find jobs p in
Hashtbl.remove jobs p;
dprintf "master: got result from worker %d@." j.worker;
let c = open_in (*in_channel_of_descr *) j.file in
let r : 'b = input_value c in
close_in c;
Sys.remove j.file;
let l = master j.task r in j.worker, l
with Not_found ->
(* If the pid is unknown to us, it's probably a process created
by one of the workers. In this case, simply continue to wait. *)
wait ()
end
| p, _ ->
Format.eprintf "master: ** PID %d killed or stopped! **@." p;
exit 1
in
run
~create_job:(fun w t ->
let j = create_worker w worker t in
dprintf "master: started worker %d (PID %d)@." w j.pid;
Hashtbl.add jobs j.pid j)
~wait (workers ()) tasks
(*** using pipes***** ***************************************************)
(****
type 'a p_job = {
p_worker : int;
p_pid : int;
p_file : file_descr;
p_task : 'a;
}
let create_worker w (f : 'a -> 'b) (t : 'a * 'c) : ('a * 'c) p_job =
let fdin, fdout = Unix.pipe () in
match fork () with
| 0 -> (* child *)
close fdin;
let r = f (fst t) in
let c = out_channel_of_descr fdout in
output_value c r;
exit 0
| pid -> (* parent *)
close fdout;
{ p_worker = w;
p_pid = pid;
p_file = fdin;
p_task = t }
let compute
~(worker : 'a -> 'b) ~(master : ('a * 'c) -> 'b -> ('a * 'c) list) tasks =
let jobs = Hashtbl.create 17 in (* fd -> job *)
let rec wait () =
let fds = Hashtbl.fold (fun fd _ acc -> fd :: acc) jobs [] in
match select fds [] [] (-1.0) with
| fd :: _, _, _ ->
let j = Hashtbl.find jobs fd in
dprintf "master: got result from worker PID %d@." j.p_pid;
dprintf "master: got result from worker %d@." j.p_worker;
Hashtbl.remove jobs fd;
let c = in_channel_of_descr j.p_file in
let r : 'b = input_value c in
close_in c;
let l = master j.p_task r in
j.p_worker, l
| [], _, _ ->
assert false
in
run
~create_job:(fun w t ->
let j = create_worker w worker t in
dprintf "master: started worker %d (PID %d)@." w j.p_pid;
Hashtbl.add jobs j.p_file j)
~wait (workers ()) tasks
****)
(* derived API *)
include Map_fold.Make(struct let compute = compute end)
(*******
type ('a, 'b) map_reduce =
| Map of 'a
| Reduce of 'b
let map_reduce ~map ~reduce l =
let results = Hashtbl.create 17 in
let to_reduce = Hashtbl.create 17 in
let add k2 v2l =
try
let l = Hashtbl.find results k2 in
Hashtbl.replace results k2 (v2l :: l);
Hashtbl.replace to_reduce k2 ()
with Not_found ->
Hashtbl.add results k2 [v2l]
in
let reduce_tasks () =
let tl =
Hashtbl.fold
(fun k2 _ acc -> (Reduce (k2, Hashtbl.find results k2)) :: acc)
to_reduce []
in
Hashtbl.iter (fun x _ -> Hashtbl.remove results x) to_reduce;
Hashtbl.clear to_reduce;
tl
in
master
~f:(function
| Map v1 -> Map (map v1)
| Reduce (k2, v2l) -> Reduce (reduce k2 v2l))
~handle:(fun x r ->
match x, r with
| Map _, Map r ->
List.iter (fun (k2, v2) -> add k2 [v2]) r; reduce_tasks ()
| Reduce (k2, _), Reduce r ->
add k2 r; reduce_tasks ()
| _ ->
assert false)
(List.map (fun x -> Map x) l);
Hashtbl.fold
(fun k2 v2l res -> match v2l with
| [v2l] -> (k2, v2l) :: res
| _ -> assert false)
results []
**********)