Skip to content

Commit 277ea06

Browse files
authored
Merge pull request #27 from NucciTheBoss/cleaning_parallel
Add support for cleaning stage plugins
2 parents 2a3f493 + ffe4cd4 commit 277ea06

File tree

2 files changed

+146
-11
lines changed

2 files changed

+146
-11
lines changed

main.py

Lines changed: 130 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,10 @@
346346
raise FileNotFoundError(Fore.RED + "The plugin {} is not found. Please verify that you are using the correct file path.".format(
347347
clean_control["plot"][key]["plugin"]))
348348

349+
# Create plot directory to save plots
350+
print_info("Creating directory to save plots.")
351+
os.makedirs("data/plots", exist_ok=True)
352+
349353
print_info("Generating directive list for worker nodes.")
350354
# Generate and slice directive list that will be sent out to the workers
351355
clean_directive_list = sst.generate_clean(clean_control["plot"], ROOT_PATH + "/data/plots", ROOT_PATH + "/data")
@@ -603,7 +607,6 @@
603607
except subprocess.SubprocessError:
604608
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
605609

606-
607610
# Close the training plugin log file
608611
fout.close()
609612

@@ -635,6 +638,24 @@
635638
logger.warning("INFO: Received task list {} from manager.".format(task_list))
636639

637640
if task_list != []:
641+
logger.warning("INFO: Beginning cleaning stage plotting.")
642+
643+
for task in task_list:
644+
logger.warning("INFO: Generating plot {}.".format(task[2]))
645+
file_output = "data/.logs/worker-1/{}-plot-{}".format(TIME, task[2])
646+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
647+
fout = open(file_output, "wt")
648+
649+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
650+
651+
try:
652+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
653+
654+
except subprocess.SubprocessError:
655+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
656+
657+
fout.close()
658+
638659
comm.send(1, dest=0, tag=1)
639660

640661
else:
@@ -812,7 +833,6 @@
812833
except subprocess.SubprocessError:
813834
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
814835

815-
816836
# Close the training plugin log file
817837
fout.close()
818838

@@ -844,6 +864,24 @@
844864
logger.warning("INFO: Received task list {} from manager.".format(task_list))
845865

846866
if task_list != []:
867+
logger.warning("INFO: Beginning cleaning stage plotting.")
868+
869+
for task in task_list:
870+
logger.warning("INFO: Generating plot {}.".format(task[2]))
871+
file_output = "data/.logs/worker-2/{}-plot-{}".format(TIME, task[2])
872+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
873+
fout = open(file_output, "wt")
874+
875+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
876+
877+
try:
878+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
879+
880+
except subprocess.SubprocessError:
881+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
882+
883+
fout.close()
884+
847885
comm.send(1, dest=0, tag=2)
848886

849887
else:
@@ -1021,7 +1059,6 @@
10211059
except subprocess.SubprocessError:
10221060
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
10231061

1024-
10251062
# Close the training plugin log file
10261063
fout.close()
10271064

@@ -1053,6 +1090,24 @@
10531090
logger.warning("INFO: Received task list {} from manager.".format(task_list))
10541091

10551092
if task_list != []:
1093+
logger.warning("INFO: Beginning cleaning stage plotting.")
1094+
1095+
for task in task_list:
1096+
logger.warning("INFO: Generating plot {}.".format(task[2]))
1097+
file_output = "data/.logs/worker-3/{}-plot-{}".format(TIME, task[2])
1098+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
1099+
fout = open(file_output, "wt")
1100+
1101+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
1102+
1103+
try:
1104+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
1105+
1106+
except subprocess.SubprocessError:
1107+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
1108+
1109+
fout.close()
1110+
10561111
comm.send(1, dest=0, tag=3)
10571112

10581113
else:
@@ -1230,7 +1285,6 @@
12301285
except subprocess.SubprocessError:
12311286
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
12321287

1233-
12341288
# Close the training plugin log file
12351289
fout.close()
12361290

@@ -1262,6 +1316,24 @@
12621316
logger.warning("INFO: Received task list {} from manager.".format(task_list))
12631317

12641318
if task_list != []:
1319+
logger.warning("INFO: Beginning cleaning stage plotting.")
1320+
1321+
for task in task_list:
1322+
logger.warning("INFO: Generating plot {}.".format(task[2]))
1323+
file_output = "data/.logs/worker-4/{}-plot-{}".format(TIME, task[2])
1324+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
1325+
fout = open(file_output, "wt")
1326+
1327+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
1328+
1329+
try:
1330+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
1331+
1332+
except subprocess.SubprocessError:
1333+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
1334+
1335+
fout.close()
1336+
12651337
comm.send(1, dest=0, tag=4)
12661338

12671339
else:
@@ -1439,7 +1511,6 @@
14391511
except subprocess.SubprocessError:
14401512
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
14411513

1442-
14431514
# Close the training plugin log file
14441515
fout.close()
14451516

@@ -1471,6 +1542,24 @@
14711542
logger.warning("INFO: Received task list {} from manager.".format(task_list))
14721543

14731544
if task_list != []:
1545+
logger.warning("INFO: Beginning cleaning stage plotting.")
1546+
1547+
for task in task_list:
1548+
logger.warning("INFO: Generating plot {}.".format(task[2]))
1549+
file_output = "data/.logs/worker-5/{}-plot-{}".format(TIME, task[2])
1550+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
1551+
fout = open(file_output, "wt")
1552+
1553+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
1554+
1555+
try:
1556+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
1557+
1558+
except subprocess.SubprocessError:
1559+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
1560+
1561+
fout.close()
1562+
14741563
comm.send(1, dest=0, tag=5)
14751564

14761565
else:
@@ -1648,7 +1737,6 @@
16481737
except subprocess.SubprocessError:
16491738
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
16501739

1651-
16521740
# Close the training plugin log file
16531741
fout.close()
16541742

@@ -1680,6 +1768,24 @@
16801768
logger.warning("INFO: Received task list {} from manager.".format(task_list))
16811769

16821770
if task_list != []:
1771+
logger.warning("INFO: Beginning cleaning stage plotting.")
1772+
1773+
for task in task_list:
1774+
logger.warning("INFO: Generating plot {}.".format(task[2]))
1775+
file_output = "data/.logs/worker-6/{}-plot-{}".format(TIME, task[2])
1776+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
1777+
fout = open(file_output, "wt")
1778+
1779+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
1780+
1781+
try:
1782+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
1783+
1784+
except subprocess.SubprocessError:
1785+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
1786+
1787+
fout.close()
1788+
16831789
comm.send(1, dest=0, tag=6)
16841790

16851791
else:
@@ -1857,7 +1963,6 @@
18571963
except subprocess.SubprocessError:
18581964
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))
18591965

1860-
18611966
# Close the training plugin log file
18621967
fout.close()
18631968

@@ -1889,6 +1994,24 @@
18891994
logger.warning("INFO: Received task list {} from manager.".format(task_list))
18901995

18911996
if task_list != []:
1997+
logger.warning("INFO: Beginning cleaning stage plotting.")
1998+
1999+
for task in task_list:
2000+
logger.warning("INFO: Generating plot {}.".format(task[2]))
2001+
file_output = "data/.logs/worker-7/{}-plot-{}".format(TIME, task[2])
2002+
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
2003+
fout = open(file_output, "wt")
2004+
2005+
clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)
2006+
2007+
try:
2008+
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)
2009+
2010+
except subprocess.SubprocessError:
2011+
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))
2012+
2013+
fout.close()
2014+
18922015
comm.send(1, dest=0, tag=7)
18932016

18942017
else:

utils/workerops/paramfactory.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,28 @@ def attack_train_factory(adver_features: List[str], model_labels: np.ndarray,
142142
return pickle_path
143143

144144

145-
def clean_factory() -> str:
145+
def clean_factory(models: List[str], plot_name: str, save_path: str, root_path: str) -> str:
146146
"""
147147
Generate parameter dictionary that will be sent out to the cleaning plugins for the cleaning stage.
148148
Save as a pickle and return a file path reference to that pickle.
149149
150150
### Parameters:
151-
- TODO
151+
:param models: List of root model directories containing data for plots.
152+
:param plot_name: Name to use for user-generated plot file.
153+
:param save_path: System location save the adversarial examples.
154+
:param root_path: Root directory of Jespipe.
152155
153156
### Returns:
154157
:return: System file path reference to pickled parameter dictionary.
155158
"""
156-
# TODO: Update this function once you revisit the cleaning stage next week
157-
pass
159+
d = dict()
160+
161+
d["model_list"] = models
162+
d["plot_name"] = plot_name
163+
d["save_path"] = save_path
164+
165+
# Establish path to file in .tmp directory and dump dictionary
166+
pickle_path = root_path + "/data/.tmp/" + str(uuid.uuid4()) + ".pkl"
167+
joblib.dump(d, pickle_path)
168+
169+
return pickle_path

0 commit comments

Comments
 (0)