Skip to content

Commit ffe4cd4

Browse files
committed
Expand cleaning stage to rest of the worker nodes
1 parent 42628b0 commit ffe4cd4

File tree

1 file changed

+126
-7
lines changed

1 file changed

+126
-7
lines changed

main.py

Lines changed: 126 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,6 @@
607607
except subprocess.SubprocessError:
608608
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
609609

610-
611610
# Close the training plugin log file
612611
fout.close()
613612

@@ -639,6 +638,24 @@
639638
logger.warning("INFO: Received task list {} from manager.".format(task_list))
640639

641640
if task_list != []:
641+
logger.warning("INFO: Beginning cleaning stage plotting.")
642+
643+
for task in task_list:
644+
logger.warning("INFO: Generating plot {}.".format(task[2]))
645+
file_output = "data/.logs/worker-1/{}-plot-{}".format(TIME, task[2])
646+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
647+
fout = open(file_output, "wt")
648+
649+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
650+
651+
try:
652+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
653+
654+
except subprocess.SubprocessError:
655+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
656+
657+
fout.close()
658+
642659
comm.send(1, dest=0, tag=1)
643660

644661
else:
@@ -816,7 +833,6 @@
816833
except subprocess.SubprocessError:
817834
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
818835

819-
820836
# Close the training plugin log file
821837
fout.close()
822838

@@ -848,6 +864,24 @@
848864
logger.warning("INFO: Received task list {} from manager.".format(task_list))
849865

850866
if task_list != []:
867+
logger.warning("INFO: Beginning cleaning stage plotting.")
868+
869+
for task in task_list:
870+
logger.warning("INFO: Generating plot {}.".format(task[2]))
871+
file_output = "data/.logs/worker-2/{}-plot-{}".format(TIME, task[2])
872+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
873+
fout = open(file_output, "wt")
874+
875+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
876+
877+
try:
878+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
879+
880+
except subprocess.SubprocessError:
881+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
882+
883+
fout.close()
884+
851885
comm.send(1, dest=0, tag=2)
852886

853887
else:
@@ -1025,7 +1059,6 @@
10251059
except subprocess.SubprocessError:
10261060
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
10271061

1028-
10291062
# Close the training plugin log file
10301063
fout.close()
10311064

@@ -1057,6 +1090,24 @@
10571090
logger.warning("INFO: Received task list {} from manager.".format(task_list))
10581091

10591092
if task_list != []:
1093+
logger.warning("INFO: Beginning cleaning stage plotting.")
1094+
1095+
for task in task_list:
1096+
logger.warning("INFO: Generating plot {}.".format(task[2]))
1097+
file_output = "data/.logs/worker-3/{}-plot-{}".format(TIME, task[2])
1098+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
1099+
fout = open(file_output, "wt")
1100+
1101+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
1102+
1103+
try:
1104+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
1105+
1106+
except subprocess.SubprocessError:
1107+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
1108+
1109+
fout.close()
1110+
10601111
comm.send(1, dest=0, tag=3)
10611112

10621113
else:
@@ -1234,7 +1285,6 @@
12341285
except subprocess.SubprocessError:
12351286
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
12361287

1237-
12381288
# Close the training plugin log file
12391289
fout.close()
12401290

@@ -1266,6 +1316,24 @@
12661316
logger.warning("INFO: Received task list {} from manager.".format(task_list))
12671317

12681318
if task_list != []:
1319+
logger.warning("INFO: Beginning cleaning stage plotting.")
1320+
1321+
for task in task_list:
1322+
logger.warning("INFO: Generating plot {}.".format(task[2]))
1323+
file_output = "data/.logs/worker-4/{}-plot-{}".format(TIME, task[2])
1324+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
1325+
fout = open(file_output, "wt")
1326+
1327+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
1328+
1329+
try:
1330+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
1331+
1332+
except subprocess.SubprocessError:
1333+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
1334+
1335+
fout.close()
1336+
12691337
comm.send(1, dest=0, tag=4)
12701338

12711339
else:
@@ -1443,7 +1511,6 @@
14431511
except subprocess.SubprocessError:
14441512
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
14451513

1446-
14471514
# Close the training plugin log file
14481515
fout.close()
14491516

@@ -1475,6 +1542,24 @@
14751542
logger.warning("INFO: Received task list {} from manager.".format(task_list))
14761543

14771544
if task_list != []:
1545+
logger.warning("INFO: Beginning cleaning stage plotting.")
1546+
1547+
for task in task_list:
1548+
logger.warning("INFO: Generating plot {}.".format(task[2]))
1549+
file_output = "data/.logs/worker-5/{}-plot-{}".format(TIME, task[2])
1550+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
1551+
fout = open(file_output, "wt")
1552+
1553+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
1554+
1555+
try:
1556+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
1557+
1558+
except subprocess.SubprocessError:
1559+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
1560+
1561+
fout.close()
1562+
14781563
comm.send(1, dest=0, tag=5)
14791564

14801565
else:
@@ -1652,7 +1737,6 @@
16521737
except subprocess.SubprocessError:
16531738
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
16541739

1655-
16561740
# Close the training plugin log file
16571741
fout.close()
16581742

@@ -1684,6 +1768,24 @@
16841768
logger.warning("INFO: Received task list {} from manager.".format(task_list))
16851769

16861770
if task_list != []:
1771+
logger.warning("INFO: Beginning cleaning stage plotting.")
1772+
1773+
for task in task_list:
1774+
logger.warning("INFO: Generating plot {}.".format(task[2]))
1775+
file_output = "data/.logs/worker-6/{}-plot-{}".format(TIME, task[2])
1776+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
1777+
fout = open(file_output, "wt")
1778+
1779+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
1780+
1781+
try:
1782+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
1783+
1784+
except subprocess.SubprocessError:
1785+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
1786+
1787+
fout.close()
1788+
16871789
comm.send(1, dest=0, tag=6)
16881790

16891791
else:
@@ -1861,7 +1963,6 @@
18611963
except subprocess.SubprocessError:
18621964
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
18631965

1864-
18651966
# Close the training plugin log file
18661967
fout.close()
18671968

@@ -1893,6 +1994,24 @@
18931994
logger.warning("INFO: Received task list {} from manager.".format(task_list))
18941995

18951996
if task_list != []:
1997+
logger.warning("INFO: Beginning cleaning stage plotting.")
1998+
1999+
for task in task_list:
2000+
logger.warning("INFO: Generating plot {}.".format(task[2]))
2001+
file_output = "data/.logs/worker-7/{}-plot-{}".format(TIME, task[2])
2002+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
2003+
fout = open(file_output, "wt")
2004+
2005+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
2006+
2007+
try:
2008+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
2009+
2010+
except subprocess.SubprocessError:
2011+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
2012+
2013+
fout.close()
2014+
18962015
comm.send(1, dest=0, tag=7)
18972016

18982017
else:

0 commit comments

Comments
 (0)