Skip to content

Commit 5ad0f60

Browse files
authored
aph hub fixes (#294)
* adding some code to make it easier to avoid spurious warnings * REVERT part of the 'trying to get rid of spoke sleep' commit, which is an allreduce in hub.py (search for the word 'revert') * revert part of intra-hub sync for aph * aircond cylinders has the trace files hooked up * fix indentation error * adding a slurm script to demonstrate traces * code for traces was already factored up * missed the update * Update hub.py aph cannot have the synchronization barriers * Update hub.py typo from last commit
1 parent e054a2d commit 5ad0f60

File tree

4 files changed

+56
-39
lines changed

4 files changed

+56
-39
lines changed

Diff for: examples/aircond/aircond_cylinders.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ def main():
223223
scenario_denouement = refmodule.scenario_denouement
224224

225225
if cfg.EF_directly:
226+
"""
226227
ama_options = {"EF-mstage": not proper_bundles,
227228
"EF-2stage": proper_bundles,
228229
"EF_solver_name": cfg.solver_name,
@@ -231,8 +232,9 @@ def main():
231232
"_mpisppy_probability": 1/ScenCount, # is this needed?
232233
"tee_ef_solves":False,
233234
}
235+
"""
234236
ama = amalgamator.from_module(refmodule,
235-
ama_options,use_command_line=False)
237+
cfg, use_command_line=False)
236238
ama.run()
237239
print(f"EF inner bound=", ama.best_inner_bound)
238240
print(f"EF outer bound=", ama.best_outer_bound)
@@ -306,13 +308,17 @@ def main():
306308
soptions = option_string_to_dict(cfg.solver_options)
307309
hub_dict["opt_kwargs"]["options"]["iter0_solver_options"].update(soptions)
308310
hub_dict["opt_kwargs"]["options"]["iterk_solver_options"].update(soptions)
311+
for sd in list_of_spoke_dict:
312+
sd["opt_kwargs"]["options"]["iter0_solver_options"].update(soptions)
313+
sd["opt_kwargs"]["options"]["iterk_solver_options"].update(soptions)
314+
309315
if with_xhatspecific:
310316
xhatspecific_spoke["opt_kwargs"]["options"]["xhat_looper_options"]["xhat_solver_options"].update(soptions)
311317
if with_xhatshuffle:
312318
xhatshuffle_spoke["opt_kwargs"]["options"]["xhat_looper_options"]["xhat_solver_options"].update(soptions)
313-
for sd in list_of_spoke_dict:
314-
sd["opt_kwargs"]["options"]["iter0_solver_options"].update(soptions)
315-
sd["opt_kwargs"]["options"]["iterk_solver_options"].update(soptions)
319+
# special code to get a trace for xhatshuffle
320+
if with_xhatshuffle and cfg.trace_prefix is not None:
321+
xhatshuffle_spoke["opt_kwargs"]["options"]['shuffle_running_trace_prefix'] = cfg.trace_prefix
316322

317323
wheel = WheelSpinner(hub_dict, list_of_spoke_dict)
318324
wheel.spin()

Diff for: examples/aircond/trace.slurm

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/bin/bash -l
2+
3+
# Outputs a trace file (but throws an error if the file exists)
4+
5+
#!/bin/bash -l
6+
#SBATCH --job-name=aircond_demo_slurm
7+
#SBATCH --output=demo_slurm.out
8+
#SBATCH --ntasks=9
9+
#SBATCH --cpus-per-task=2
10+
#SBATCH --time=0-0:30:00
11+
#SBATCH --nodelist=c[3]
12+
13+
conda activate mpisppy
14+
SOLVERNAME="gurobi_persistent"
15+
export GRB_LICENSE_FILE=/home/dlwoodruff/software/gurobi950/licenses/c3/gurobi.lic
16+
17+
# TBD: aircond uses start-seed (but seed is allowed as an arg that is ignored).
18+
19+
# xhat output file name is hardwired
20+
mpiexec -np $SLURM_NTASKS python -m mpi4py aircond_cylinders.py --bundles-per-rank=0 --max-iterations=100 --default-rho=1 --solver-name=${SOLVERNAME} --branching-factors "20 5 4" --Capacity 200 --QuadShortCoeff 0.3 --BeginInventory 50 --rel-gap 0.001 --mu-dev 0 --sigma-dev 40 --max-solver-threads 2 --start-seed 0 --start-ups --lagrangian --xhatshuffle --trace-prefix ${SLURM_JOB_NAME}_
21+
22+
# --no-lagrangian --with-lagranger --lagranger-rho-rescale-factors-json lagranger_factors.json
23+
# --with-lagrangian --no-lagranger

Diff for: mpisppy/cylinders/hub.py

+23-15
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import time
77
import mpisppy.log
8+
from mpisppy.opt.aph import APH
89

910
from mpisppy import MPI
1011
from mpisppy.cylinders.spcommunicator import SPCommunicator
@@ -347,7 +348,8 @@ def hub_to_spoke(self, values, spoke_strata_rank):
347348
f"into local buffer of length {expected_length}"
348349
)
349350
# this is so the spoke ranks all get the same write_id at approximately the same time
350-
self.cylinder_comm.Barrier()
351+
if not isinstance(self.opt, APH):
352+
self.cylinder_comm.Barrier()
351353
self.local_write_ids[spoke_strata_rank - 1] += 1
352354
values[-1] = self.local_write_ids[spoke_strata_rank - 1]
353355
window = self.windows[spoke_strata_rank - 1]
@@ -370,25 +372,31 @@ def hub_from_spoke(self, values, spoke_num):
370372
)
371373
# so the window in each rank gets read at approximately the same time,
372374
# and so has the same write_id
373-
self.cylinder_comm.Barrier()
375+
if not isinstance(self.opt, APH):
376+
self.cylinder_comm.Barrier()
374377
window = self.windows[spoke_num - 1]
375378
window.Lock(spoke_num)
376379
window.Get((values, len(values), MPI.DOUBLE), spoke_num)
377380
window.Unlock(spoke_num)
378381

379-
new_id = int(values[-1])
380-
local_val = np.array((new_id,), 'i')
381-
sum_ids = np.zeros(1, 'i')
382-
self.cylinder_comm.Allreduce((local_val, MPI.INT),
383-
(sum_ids, MPI.INT),
384-
op=MPI.SUM)
385-
386-
if new_id != sum_ids[0] / self.cylinder_comm.size:
387-
return False
388-
389-
if (new_id > self.remote_write_ids[spoke_num - 1]) or (new_id < 0):
390-
self.remote_write_ids[spoke_num - 1] = new_id
391-
return True
382+
if isinstance(self.opt, APH):
383+
# reverting part of changes from Ben getting rid of spoke sleep DLW jan 2023
384+
if values[-1] > self.remote_write_ids[spoke_num - 1]:
385+
self.remote_write_ids[spoke_num - 1] = values[-1]
386+
return True
387+
else:
388+
new_id = int(values[-1])
389+
local_val = np.array((new_id,), 'i')
390+
sum_ids = np.zeros(1, 'i')
391+
self.cylinder_comm.Allreduce((local_val, MPI.INT),
392+
(sum_ids, MPI.INT),
393+
op=MPI.SUM)
394+
if new_id != sum_ids[0] / self.cylinder_comm.size:
395+
return False
396+
397+
if (new_id > self.remote_write_ids[spoke_num - 1]) or (new_id < 0):
398+
self.remote_write_ids[spoke_num - 1] = new_id
399+
return True
392400
return False
393401

394402
def send_terminate(self):

Diff for: mpisppy/cylinders/xhatshufflelooper_bounder.py

-20
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,6 @@ def xhatbase_prep(self):
3232
self.verbose = self.opt.options["verbose"] # typing aid
3333
self.solver_options = self.opt.options["xhat_looper_options"]["xhat_solver_options"]
3434

35-
# Start code to support running trace. TBD: factor this up?
36-
if self.cylinder_rank == 0 and \
37-
'suffle_running_trace_prefix' in self.opt.options and \
38-
self.opt.options['shuffle_running_trace_prefix'] is not None:
39-
running_trace_prefix =\
40-
self.opt.options['shuffle_running_trace_prefix']
41-
42-
filen = running_trace_prefix+self.__class__.__name__+'.csv'
43-
if os.path.exists(filen):
44-
raise RuntimeError(f"running trace file {filen} already exists!")
45-
with open(filen, 'w') as f:
46-
f.write("time,scen,value\n")
47-
self.running_trace_filen = filen
48-
else:
49-
self.running_trace_filen = None
50-
# end code to support running trace
51-
5235
if not isinstance(self.opt, Xhat_Eval):
5336
raise RuntimeError("XhatShuffleInnerBound must be used with Xhat_Eval.")
5437

@@ -93,9 +76,6 @@ def _vb(msg):
9376
if self.verbose and self.opt.cylinder_rank == 0:
9477
print ("(rank0) " + msg)
9578

96-
if self.running_trace_filen is not None:
97-
with open(self.running_trace_filen, "a") as f:
98-
f.write(f"{time.time()},{snamedict},{obj}\n")
9979
if obj is None:
10080
_vb(f" Infeasible {snamedict}")
10181
return False

0 commit comments

Comments
 (0)