-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrunslog
executable file
·405 lines (368 loc) · 17.3 KB
/
runslog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
#!/usr/bin/env python3
"""
Runs a slog file on server or locally.
USAGE: `./runslog -s <server-address IP> <slogfile> <out-dir> -f <factdir>`
`./runslog -j4 -s <server-address IP> <slogfile> <out-dir> -f onerel.facts`
`./runslog <slogfile> <out-dir> --cores=8`
`./runslog --help`
`./runslog -e <out-dir>
fact directory is optional.
However, the output will be stored on the server, and `dumpslog` needs to be used
to get the output... dumpslog does not exist at the moment.
"""
import argparse, os, shutil, subprocess, time, sys
from yaspin import yaspin
from slog.common.client import SlogClient
from slog.repl.repl import Repl
def is_temp_rel_name(rel_name):
return ('$bir' in rel_name) or ('$inter' in rel_name) or ('$rule' in rel_name) or ('$head' in rel_name)
def ingest_facts(factloc, inputloc, tsv_bin_path, cores):
# -- go through every .table file in the input data
# -- extract tag, relation name and arity.
tables = {}
for tablefile in os.listdir(inputloc):
if not tablefile.endswith('.table'):
continue
without_ext = tablefile[:tablefile.rfind('.')]
tagendloc = without_ext.find('.')
aritystartloc = without_ext.rfind('.')
tag = without_ext[:tagendloc]
arity = int(without_ext[aritystartloc+1:])
relname = without_ext[tagendloc+1:aritystartloc]
tables[(relname, arity)] = (tag, tablefile)
current_max_tag = max([int(t[0]) for t in tables.values()])
# -- now look through the input .facts file(s) and run
# -- tsv_to_bin on any.
if not os.path.exists(factloc):
print(f"Fact file/directory does not exist! Given `{factloc}`")
return None
infacts = set()
if os.path.isdir(factloc):
for factfile in os.listdir(factloc):
extn = factfile[factfile.rfind('.'):]
if extn not in {'.csv', '.tsv', '.facts'} or 'string.csv' in factfile:
continue
infacts.add(os.path.join(factloc, factfile))
else:
# just add the file, dont check extension
infacts.add(factloc)
# -- now copy all input table facts inside factloc but rearrange rel tag
wdb_flag = False
for table_file in os.listdir(factloc):
if '$strings.csv' in table_file:
wdb_flag = True
if wdb_flag:
os.system(f"rm -r {inputloc}")
os.system(f"cp -r {factloc} {inputloc}")
for f in os.listdir(inputloc):
if is_temp_rel_name(f):
os.remove(os.path.join(inputloc, f))
else:
for factfile in infacts:
with open(factfile, 'r') as ff:
first = ff.readline()
# immediate "" means empty file, so no facts to add.
if first == "":
continue
# get relname from file to compare against
base = os.path.basename(factfile)
relname = base[:base.rfind('.')]
arity = len(first.split('\t'))
table = tables.get((relname, arity))
if not table:
print(f"`{relname}` with arity `{arity}` not found in slogfile, but used as input.")
tabletag = str(current_max_tag + 1)
current_max_tag = current_max_tag + 1
print(f"assign a new tag `{tabletag}` to `{relname}` with arity `{arity}`")
tableloc = os.path.join(inputloc, f"{tabletag}.{relname}.{arity}.table")
else:
tabletag = table[0]
tableloc = os.path.join(inputloc, table[1])
try:
# idk why 16 is buckets, got from rpc.py
subprocess.check_output([tsv_bin_path, factfile, str(arity),
tableloc, str(cores), str(tabletag), inputloc])
except subprocess.CalledProcessError as e:
print(f"tsv_to_bin failed! Code: `{e.returncode}`, Error:\n{e.output.decode()}")
return None
return True
def ensure_backend_built(compile_backend, tsv_bin_path, backend_build_dir, backend_dir, cores, cmake_prof_opt, cmake_build_mode, verbose):
# check if we need to compile.
if os.path.exists(tsv_bin_path) and not compile_backend:
if verbose:
print("Not compiling backend.")
return
if verbose:
print("Compiling slog MPI backend")
try:
start = time.time()
# clean build directory
shutil.rmtree(backend_build_dir, ignore_errors=True)
cmake_config_cmd = ["cmake", "--no-warn-unused-cli",
"-DCMAKE_EXPORT_COMPILE_COMMANDS:BOOL=TRUE",
f"-DCMAKE_BUILD_TYPE:STRING={cmake_build_mode}",
"-DCMAKE_C_COMPILER:FILEPATH=mpicc",
"-B"+backend_build_dir, "-H"+backend_dir] + cmake_prof_opt
subprocess.check_output(cmake_config_cmd,
stderr=subprocess.STDOUT)
subprocess.check_output(["cmake", "--build", backend_build_dir,
"--config", cmake_build_mode, "--target", "all",
"-j", f"{cores}"], stderr=subprocess.STDOUT)
end = time.time()
if verbose:
print(f"Time taken: {end - start}")
except subprocess.CalledProcessError as e:
print(f"Compiling Compiling slog MPI backend failed! Code: `{e.returncode}`, Error:\n{e.output.decode()}")
def run_local(file, out_path, factloc, cores, verbose, slog_root=None, compile_backend=False,
debug_flag=False, profile_flag=False, repl_flag=False, compile_only=False,
overwrite_out=False):
"""
Calls local utilities to compile and run a slog file locally.
slog_root is the location of the slog repository.
If not provided, it is assumed to be the current working directory.
"""
if not slog_root:
slog_root = os.getcwd()
if profile_flag:
cmake_prof_opt = ["-DCMAKE_CXX_FLAGS=-pg", "-DCMAKE_EXE_LINKER_FLAGS=-pg",
"-DCMAKE_SHARED_LINKER_FLAGS=-pg"]
else:
cmake_prof_opt = []
if debug_flag:
cmake_build_mode = "Debug"
else:
cmake_build_mode = "RelWithDebInfo"
backend_dir = os.path.join(slog_root, "backend")
backend_build_dir = os.path.join(backend_dir, "build")
backend_slog_bin = os.path.join(backend_build_dir, "slog")
tsv_bin_path = os.path.join(backend_build_dir, "tsv_to_bin")
# make inputs absolute pathdums, so we can chdir more carelessly
file = os.path.realpath(file)
outloc = os.path.realpath("out")
# outloc = out_path
factloc = os.path.realpath(factloc) if factloc else None
inputloc = os.path.join(outloc, "input-data")
checkpointloc = os.path.join(outloc, "checkpoints")
# Ensure outdir doesnt exist before we start filling it up.
shutil.rmtree(outloc, ignore_errors=True)
#1: shell out to slog compiler to compile file to C++
# Get the name of the binary (just strip the `.slog` out)
if not file.endswith('.slog'):
print(f"Not given a slog file! given: {file}")
return
# ensure these exist so we can use them.
os.makedirs(outloc, exist_ok=True)
os.makedirs(inputloc, exist_ok=True)
os.makedirs(checkpointloc, exist_ok=True)
# preprocess slog file into outloc using mzpp
try:
preprocessed_file = os.path.join(outloc, os.path.basename(file))
output = subprocess.check_output(["mzpp", "-o", preprocessed_file, file],
stderr=subprocess.STDOUT)
file = preprocessed_file
except subprocess.CalledProcessError as e:
print(f"Preprocessing failed! Code: `{e.returncode}`, Error:\n{e.output.decode()}")
return
if verbose:
print("JIT Compiling .slog to .slogc")
try:
start = time.time()
output = subprocess.check_output(["racket", "compiler/slog.rkt", "-c", "--ps", "-f",
"--output-code", outloc, "--output-db", checkpointloc,
"--buckets", str(cores),
"--input-db", inputloc, file],
stderr=subprocess.STDOUT)
end = time.time()
if verbose:
ov = output.decode()
print(ov)
with open(os.path.join(outloc, "compiler-out"), "w") as f:
f.write(ov)
print(f"Time taken: {end - start}")
except subprocess.CalledProcessError as e:
print(f"Slog->C++ compilation failed! Code: `{e.returncode}`, Error:\n{e.output.decode()}")
return
slogc_name = os.path.basename(file[:file.rfind('.')]) + ".slogc"
slogc_path = os.path.join(outloc, slogc_name)
#2: Shell out to cmake to compile backend + utilities (tsv_to_bin)
ensure_backend_built(compile_backend, tsv_bin_path, backend_build_dir, backend_dir, cores, cmake_prof_opt, cmake_build_mode, verbose)
#3: Shell out to tsv_to_bin to convert fact directory to bin
if verbose:
print("Calling tsv->bin")
if factloc:
start = time.time()
out = ingest_facts(factloc, inputloc, tsv_bin_path, cores)
if not out:
print("FAILURE TO INGEST FACTS!")
return
end = time.time()
if verbose:
print(f"Time taken: {end - start}")
if not compile_only:
#4: Shell out to mpirun to run compiled slog binary with in and out args.
if verbose:
print("Running slog executable.")
try:
mpirun_bin_path = f"{os.getenv('MPI_BIN_HOME', '/opt/mpich/bin')}/mpirun"
start = time.time()
if not debug_flag:
cmd = [mpirun_bin_path, "-n", f"{cores}", backend_slog_bin,
slogc_path, inputloc, outloc]
print(cmd)
out = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
else:
out = subprocess.check_output(
[mpirun_bin_path, "-n", f"{cores}", "valgrind", "--leak-check=yes", "--show-reachable=yes",
backend_slog_bin, slogc_path, outloc, inputloc],
stderr=subprocess.STDOUT)
end = time.time()
if verbose:
print(out.decode())
print(f"Time taken: {end - start}")
except subprocess.CalledProcessError as e:
print(f"Program failed! Code: `{e.returncode}`, Error:\n{e.output.decode()}")
return
if verbose:
print("Build success!")
#6: copy to output dir
out_path = os.path.realpath(out_path)
if os.path.exists(out_path) and (out_path != outloc):
if overwrite_out:
shutil.rmtree(out_path)
shutil.copytree(outloc, out_path)
else:
print("Output dir exists, overwrite it? (temporary result under `$SLOG_HOME/out`)")
while True:
confirm_flag = input("(Y/n): ")
confirm_flag = confirm_flag.strip()
if confirm_flag in ["y", "Y"]:
shutil.rmtree(out_path)
shutil.copytree(outloc, out_path)
break
else:
break
#7: Open the repl if needed
shutil.copy(os.path.join(inputloc, '$strings.csv'),
f'{outloc}/checkpoint-final/$strings.csv')
# copy unused input file into checkpoint
checkpointloc = f'{outloc}/checkpoint-final'
checkpointe_tables = {}
for tablefile in os.listdir(checkpointloc):
if not tablefile.endswith('.table'):
continue
without_ext = tablefile[:tablefile.rfind('.')]
tagendloc = without_ext.find('.')
aritystartloc = without_ext.rfind('.')
tag = without_ext[:tagendloc]
arity = int(without_ext[aritystartloc+1:])
relname = without_ext[tagendloc+1:aritystartloc]
checkpointe_tables[(relname, arity)] = (tag, tablefile)
current_max_tag = max([int(t[0]) for t in checkpointe_tables.values()])
input_table = {}
for tablefile in os.listdir(inputloc):
if not tablefile.endswith('.table'):
continue
without_ext = tablefile[:tablefile.rfind('.')]
tagendloc = without_ext.find('.')
aritystartloc = without_ext.rfind('.')
tag = without_ext[:tagendloc]
arity = int(without_ext[aritystartloc+1:])
relname = without_ext[tagendloc+1:aritystartloc]
if (relname, arity) not in checkpointe_tables and (not is_temp_rel_name(relname)):
# tag = current_max_tag
# current_max_tag = current_max_tag + 1
shutil.copyfile(f"{inputloc}/{tablefile}", f"{checkpointloc}/{tag}.{relname}.{arity}.table")
input_table[(relname, arity)] = (tag, tablefile)
if repl_flag:
repl = Repl(local_db_path=f'{outloc}/checkpoint-final')
repl.loop()
def run_server(file, outloc, factloc, server, cores, verbose):
"""
Runs the file on the server using the SlogClient interface
"""
client = SlogClient(server)
cur_db = ""
with yaspin(text="Compiling slog file") as spinner:
cur_db = client.compile_slog(file, spinner)
if not cur_db:
spinner.write("Error compiling slog")
return
if factloc:
cur_db = client.upload_csv(factloc, spinner)
if not cur_db:
spinner.write("Error uploading facts")
return
else:
spinner.write("No input facts, continuing...")
spinner.text = "Running program..."
cur_db = client.run_with_db(file, cur_db, cores, spinner)
if not cur_db:
spinner.write("Error running file")
# TODO: what should the semantics be here
# just print the final ID so we can inspect in REPL?
# client.pretty_dump_relation("path", spinner)
spinner.write(f"Final DB:\n{cur_db}")
spinner.text = "FINISHED!"
def enterrepl(base_folder):
source_file = os.path.join(base_folder, "input-data", "$strings.csv")
destination_folder = os.path.join(base_folder, "checkpoints", "checkpoint-final")
destination_file = os.path.join(destination_folder, "$strings.csv")
os.makedirs(destination_folder, exist_ok=True)
if os.path.exists(source_file):
shutil.copy(source_file, destination_file)
else:
open(destination_file, 'a').close()
base_folder = base_folder + "/checkpoints/checkpoint-final/"
repl = Repl(local_db_path=base_folder)
try:
while True:
repl.loop()
except KeyboardInterrupt:
repl.exit()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("slogfile", nargs='?', help="The file to be compiled and ran.")
parser.add_argument("out_dir", nargs='?',
help="Where output information should be put. "
"This includes compiled files, build files, input data, and output data."
"FOR NOW THIS MUST BE A DIR NEXT TO THIS SCRIPT!!")
parser.add_argument("-f", "--facts", help="The location of the input facts file/directory.")
parser.add_argument("-s", "--server",
help="The location of the server. If this is used, server mode is used.")
parser.add_argument("-j", "--cores",
help="The number of cores to compute with",
type=int, default=1)
parser.add_argument("-v", "--verbose", action='store_true', help="Use verbose output.")
parser.add_argument('-r', "--root",
help="The location of the slog project directory. "
"Defaults to the current working directory if not provided.")
parser.add_argument("-d", "--debug", action="store_true",
help="running with valgrind in debug mode")
parser.add_argument("-p", "--profile", action="store_true",
help="compile with `-pg` in cmake for using gprof(had better also recompile backend)")
parser.add_argument('-cb', '--compile_backend', action='store_true',
help='compile the backend code before compile the generate c++ code.')
parser.add_argument('-R', "--repl", action='store_true',
help="query the result database interactively")
parser.add_argument('-co', '--compile_only', action='store_true',
help="JIT compile only without running it")
parser.add_argument('-ov', "--overwrite_out", action="store_true",
help="overwrite, if output dir already exists.")
parser.add_argument('-lb', "--local_db", action="store_true",
help="overwrite, if output dir already exists.")
args = parser.parse_args()
slogfile = os.path.realpath(args.slogfile)
if args.local_db:
repl = Repl(local_db_path=f'{args.out_dir}/checkpoint-final')
repl.loop()
else:
os.makedirs(args.out_dir, exist_ok=True)
if args.server:
run_server(slogfile, args.out_dir, args.facts, args.server + ":5108", args.verbose,
args.cores)
else:
run_local(slogfile, args.out_dir, args.facts, args.cores, args.verbose, args.root,
compile_backend=args.compile_backend, debug_flag=args.debug,
profile_flag=args.profile, repl_flag=args.repl,
compile_only=args.compile_only,
overwrite_out=args.overwrite_out)