-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrunners.ml
204 lines (181 loc) · 7.35 KB
/
runners.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
open! Core
open Little_logger
(* Note: some of these have type annotations to help Merlin out with making the
inferred types look nicer. *)
let assert_program_good_or_exit name_or_path cmd =
(* TODO it would be nice to give the user a more specific reason for the
program failing. *)
match Core_unix.Exit_or_signal.or_error @@ Core_unix.system cmd with
| Ok () ->
()
| Error err ->
Logger.fatal (fun () ->
let err_msg = Error.to_string_hum err in
[%string
"'%{name_or_path}' doesn't look like an executable file. Is it a \
path to an executable file? If not, is it a command on your \
PATH? Error: %{err_msg}"] ) ;
exit 1
module Msa = struct
open Async
type aligner = Clustalo of string | Mafft of string
let assert_program_good_or_exit = function
| Clustalo name_or_path | Mafft name_or_path ->
let cmd = [%string "%{name_or_path} --version >/dev/null 2>&1"] in
assert_program_good_or_exit name_or_path cmd
let pp_aligner ppf = function
| Clustalo path ->
Format.fprintf ppf "%s" path
| Mafft path ->
Format.fprintf ppf "%s" path
let aligner_of_string s =
let s' = String.lowercase s in
if String.is_substring s' ~substring:"clustalo" then Some (Clustalo s)
else if String.is_substring s' ~substring:"mafft" then Some (Mafft s)
else None
type opts =
{infile: string; outfile: string; other_parameters: string; max_retries: int}
type out =
{result: unit Or_error.t; stdout: string; stderr: string; opts: opts}
let make_clustalo_args opts =
(* There may be lots of other args, so just set them up in a single string,
then split them. *)
let args =
[%string "%{opts.other_parameters} -i %{opts.infile} -o %{opts.outfile}"]
in
String.split args ~on:' '
let make_mafft_args opts =
(* There may be lots of other args, so just set them up in a single string,
then split them. *)
let args = [%string "%{opts.other_parameters} %{opts.infile}"] in
String.split args ~on:' '
let remove_if_exists filename =
if Utils.is_file filename then Sys.remove filename else return ()
(* Printable representation of a command run by Process.run *)
let cmd_to_string (prog : string) args =
let args = String.concat args ~sep:" " in
[%string "%{prog} %{args}"]
(* Some errors we will retry. Let the user know we will retry and what the
command was. *)
let log_retryable_error prog args err =
let msg =
let cmd = cmd_to_string prog args in
[%string
"Command (%{cmd}) failed. Will retry.\n\
Error was: %{Error.to_string_hum err}"]
in
Logger.swarning msg
(* Eventually, we stop retrying. Let the user know there will be no more
retries and what the error was. *)
let log_final_error prog args err =
let msg =
let cmd = cmd_to_string prog args in
[%string
"Command (%{cmd}) failed. Max attempts exceeded.\n\
Error was: %{Error.to_string_hum err}"]
in
Logger.serror msg
let run_until_succes_or_error ?(delay = 0.1) prog args max_retries =
let delay =
if Float.(delay <= 0.0) then sec 0.0
else Time.Span.randomize (sec delay) ~percent:(Percent.of_percentage 25.)
in
let rec loop num_tries =
match%bind Process.run ~prog ~args () with
| Ok stdout ->
Deferred.Or_error.return stdout
| Error err ->
if num_tries < max_retries then (
log_retryable_error prog args err ;
(* We want to give just a little bit of delay before retrying the
job again. *)
let%bind (_ : unit) = after delay in
loop (num_tries + 1) )
else (
log_final_error prog args err ;
Deferred.Or_error.fail err
|> Deferred.Or_error.tag ~tag:"job failed after max-retries" )
in
loop 0
let run_mafft opts exe : string Deferred.Or_error.t =
let args = make_mafft_args opts in
Logger.debug (fun () ->
let cmd = cmd_to_string exe args in
"Running command: " ^ cmd ) ;
match%bind run_until_succes_or_error exe args opts.max_retries with
| Ok stdout ->
(* stdout for mafft is the actual alignment *)
let%bind (_ : unit) =
Writer.with_file opts.outfile ~perm:0o644 ~f:(fun writer ->
Deferred.return @@ Writer.write_line writer stdout )
in
(* The stdout gets written to the aln file. Stderr is lost this way...if
you need it, you will need to change to a lower level function that
Process.run. Return a string here to match with run_clustalo. *)
Deferred.Or_error.return ""
| Error err ->
Deferred.Or_error.fail err |> Deferred.Or_error.tag ~tag:"mafft failed"
(* Similar to run_mafft, except that clustalo outputs files to deal with. *)
let run_clustalo opts exe : string Deferred.Or_error.t =
let args = make_clustalo_args opts in
let cmd = cmd_to_string exe args in
Logger.debug (fun () -> "Running command: " ^ cmd) ;
match%bind run_until_succes_or_error exe args opts.max_retries with
| Ok stdout ->
(* We double check the the outfile actually exists. *)
if Utils.is_file opts.outfile then Deferred.Or_error.return stdout
(* And if not return an informative error. *)
else
Deferred.Or_error.errorf
"Command (%s) succeeded, but the outfile (%s) does not exist!" cmd
opts.outfile
| Error err ->
(* Aligner failed so make sure the outfile is cleaned up. *)
let%bind (_ : unit) = remove_if_exists opts.outfile in
Deferred.Or_error.fail err
|> Deferred.Or_error.tag ~tag:"clustalo failed"
let run opts = function
| Clustalo exe ->
run_clustalo opts exe
| Mafft exe ->
run_mafft opts exe
end
module Hmmalign = struct
type opts = {exe: string; queries: string; targets: string; outfile: string}
type out =
{result: unit Or_error.t; stdout: string; stderr: string; opts: opts}
let assert_program_good_or_exit name_or_path =
let cmd = [%string "%{name_or_path} -h >/dev/null 2>&1"] in
assert_program_good_or_exit name_or_path cmd
(* Runs the hmmalign, waits, closes the channels, and returns stuff. *)
let run opts =
let cmd =
[%string
"%{opts.exe} --outformat=afa -o %{opts.outfile} %{opts.targets} \
%{opts.queries}"]
in
Logger.debug (fun () -> [%string "Running command: %{cmd}"]) ;
(* Pass in the env explicitly to work with Alpine linux. *)
let chan =
Core_unix.open_process_full cmd ~env:(Core_unix.environment ())
in
let stdout = In_channel.input_all chan.stdout in
let stderr = In_channel.input_all chan.stderr in
match
Core_unix.close_process_full chan |> Core_unix.Exit_or_signal.or_error
with
| Ok () ->
let result =
if Utils.is_file opts.outfile then Or_error.return ()
else
Or_error.errorf
"hmmalign succeeded, but the outfile (%s) does not exist!"
opts.outfile
in
{result; stdout; stderr; opts}
| Error err ->
(* hmmalign failed, make sure the outfile is deleted *)
if Utils.is_file opts.outfile then Sys_unix.remove opts.outfile ;
let result = Or_error.error "hmmalign failed" err Error.sexp_of_t in
{result; stdout; stderr; opts}
end