forked from Pyomo/mpi-sppy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhub.py
719 lines (620 loc) · 28.2 KB
/
hub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
###############################################################################
# mpi-sppy: MPI-based Stochastic Programming in PYthon
#
# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for
# Sustainable Energy, LLC, The Regents of the University of California, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
# full copyright and license information.
###############################################################################
import numpy as np
import abc
import logging
import mpisppy.log
from mpisppy.opt.subgradient import Subgradient
from mpisppy.opt.aph import APH
from mpisppy import MPI
from mpisppy.cylinders.spcommunicator import SPCommunicator, communicator_array
from math import inf
from mpisppy.cylinders.spoke import ConvergerSpokeType
from mpisppy import global_toc
# Could also pass, e.g., sys.stdout instead of a filename
mpisppy.log.setup_logger("mpisppy.cylinders.Hub",
"hub.log",
level=logging.CRITICAL)
logger = logging.getLogger("mpisppy.cylinders.Hub")
class Hub(SPCommunicator):
def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, spokes, options=None):
super().__init__(spbase_object, fullcomm, strata_comm, cylinder_comm, options=options)
assert len(spokes) == self.n_spokes
self.local_write_ids = np.zeros(self.n_spokes, dtype=np.int64)
self.remote_write_ids = np.zeros(self.n_spokes, dtype=np.int64)
self.local_lengths = np.zeros(self.n_spokes, dtype=np.int64)
self.remote_lengths = np.zeros(self.n_spokes, dtype=np.int64)
# ^^^ Does NOT include +1
self.spokes = spokes # List of dicts
logger.debug(f"Built the hub object on global rank {fullcomm.Get_rank()}")
# for logging
self.print_init = True
self.latest_ib_char = None
self.latest_ob_char = None
self.last_ib_idx = None
self.last_ob_idx = None
# for termination based on stalling out
self.stalled_iter_cnt = 0
self.last_gap = float('inf') # abs_gap tracker
@abc.abstractmethod
def setup_hub(self):
pass
@abc.abstractmethod
def sync(self):
""" To be called within the whichever optimization algorithm
is being run on the hub (e.g. PH)
"""
pass
@abc.abstractmethod
def is_converged(self):
""" The hub has the ability to halt the optimization algorithm on the
hub before any local convergers.
"""
pass
@abc.abstractmethod
def current_iteration(self):
""" Returns the current iteration count - however the hub defines it.
"""
pass
@abc.abstractmethod
def main(self):
pass
def clear_latest_chars(self):
self.latest_ib_char = None
self.latest_ob_char = None
def compute_gaps(self):
""" Compute the current absolute and relative gaps,
using the current self.BestInnerBound and self.BestOuterBound
"""
if self.opt.is_minimizing:
abs_gap = self.BestInnerBound - self.BestOuterBound
else:
abs_gap = self.BestOuterBound - self.BestInnerBound
## define by the best solution, as is common
nano = float("nan") # typing aid
if (
abs_gap != nano
and abs_gap != float("inf")
and abs_gap != float("-inf")
and self.BestOuterBound != nano
and self.BestOuterBound != 0
):
rel_gap = abs_gap / abs(self.BestOuterBound)
else:
rel_gap = float("inf")
return abs_gap, rel_gap
def get_update_string(self):
if self.latest_ib_char is None and \
self.latest_ob_char is None:
return ' '
if self.latest_ib_char is None:
return self.latest_ob_char + ' '
if self.latest_ob_char is None:
return ' ' + self.latest_ib_char
return self.latest_ob_char+' '+self.latest_ib_char
def screen_trace(self):
current_iteration = self.current_iteration()
abs_gap, rel_gap = self.compute_gaps()
best_solution = self.BestInnerBound
best_bound = self.BestOuterBound
update_source = self.get_update_string()
if self.print_init:
row = f'{"Iter.":>5s} {" "} {"Best Bound":>14s} {"Best Incumbent":>14s} {"Rel. Gap":>12s} {"Abs. Gap":>14s}'
global_toc(row, True)
self.print_init = False
row = f"{current_iteration:5d} {update_source} {best_bound:14.4f} {best_solution:14.4f} {rel_gap*100:12.3f}% {abs_gap:14.4f}"
global_toc(row, True)
self.clear_latest_chars()
def determine_termination(self):
# return True if termination is indicated, otherwise return False
if not hasattr(self,"options") or self.options is None\
or ("rel_gap" not in self.options and "abs_gap" not in self.options\
and "max_stalled_iters" not in self.options):
return False # Nothing to see here folks...
# If we are still here, there is some option for termination
abs_gap, rel_gap = self.compute_gaps()
abs_gap_satisfied = False
rel_gap_satisfied = False
max_stalled_satisfied = False
if "rel_gap" in self.options and rel_gap <= self.options["rel_gap"]:
rel_gap_satisfied = True
if "abs_gap" in self.options and abs_gap <= self.options["abs_gap"]:
abs_gap_satisfied = True
if "max_stalled_iters" in self.options:
if abs_gap < self.last_gap: # liberal test (we could use an epsilon)
self.last_gap = abs_gap
self.stalled_iter_cnt = 0
else:
self.stalled_iter_cnt += 1
if self.stalled_iter_cnt >= self.options["max_stalled_iters"]:
max_stalled_satisfied = True
if abs_gap_satisfied:
global_toc(f"Terminating based on inter-cylinder absolute gap {abs_gap:12.4f}")
if rel_gap_satisfied:
global_toc(f"Terminating based on inter-cylinder relative gap {rel_gap*100:12.3f}%")
if max_stalled_satisfied:
global_toc(f"Terminating based on max-stalled-iters {self.stalled_iter_cnt}")
return abs_gap_satisfied or rel_gap_satisfied or max_stalled_satisfied
def hub_finalize(self):
if self.has_outerbound_spokes:
self.receive_outerbounds()
if self.has_innerbound_spokes:
self.receive_innerbounds()
if self.global_rank == 0:
self.print_init = True
global_toc("Statistics at termination", True)
self.screen_trace()
def receive_innerbounds(self):
""" Get inner bounds from inner bound spokes
NOTE: Does not check if there _are_ innerbound spokes
(but should be harmless to call if there are none)
"""
logging.debug("Hub is trying to receive from InnerBounds")
for idx in self.innerbound_spoke_indices:
is_new = self.hub_from_spoke(self.innerbound_receive_buffers[idx], idx)
if is_new:
bound = self.innerbound_receive_buffers[idx][0]
logging.debug("!! new InnerBound to opt {}".format(bound))
self.BestInnerBound = self.InnerBoundUpdate(bound, idx)
logging.debug("ph back from InnerBounds")
def receive_outerbounds(self):
""" Get outer bounds from outer bound spokes
NOTE: Does not check if there _are_ outerbound spokes
(but should be harmless to call if there are none)
"""
logging.debug("Hub is trying to receive from OuterBounds")
for idx in self.outerbound_spoke_indices:
is_new = self.hub_from_spoke(self.outerbound_receive_buffers[idx], idx)
if is_new:
bound = self.outerbound_receive_buffers[idx][0]
logging.debug("!! new OuterBound to opt {}".format(bound))
self.BestOuterBound = self.OuterBoundUpdate(bound, idx)
logging.debug("ph back from OuterBounds")
def OuterBoundUpdate(self, new_bound, idx=None, char='*'):
current_bound = self.BestOuterBound
if self._outer_bound_update(new_bound, current_bound):
if idx is None:
self.latest_ob_char = char
self.last_ob_idx = 0
else:
self.latest_ob_char = self.outerbound_spoke_chars[idx]
self.last_ob_idx = idx
return new_bound
else:
return current_bound
def InnerBoundUpdate(self, new_bound, idx=None, char='*'):
current_bound = self.BestInnerBound
if self._inner_bound_update(new_bound, current_bound):
if idx is None:
self.latest_ib_char = char
self.last_ib_idx = 0
else:
self.latest_ib_char = self.innerbound_spoke_chars[idx]
self.last_ib_idx = idx
return new_bound
else:
return current_bound
def initialize_bound_values(self):
if self.opt.is_minimizing:
self.BestInnerBound = inf
self.BestOuterBound = -inf
self._inner_bound_update = lambda new, old : (new < old)
self._outer_bound_update = lambda new, old : (new > old)
else:
self.BestInnerBound = -inf
self.BestOuterBound = inf
self._inner_bound_update = lambda new, old : (new > old)
self._outer_bound_update = lambda new, old : (new < old)
def initialize_outer_bound_buffers(self):
""" Initialize value of BestOuterBound, and outer bound receive buffers
"""
self.outerbound_receive_buffers = dict()
for idx in self.outerbound_spoke_indices:
self.outerbound_receive_buffers[idx] = communicator_array(
self.remote_lengths[idx - 1]
)
def initialize_inner_bound_buffers(self):
""" Initialize value of BestInnerBound, and inner bound receive buffers
"""
self.innerbound_receive_buffers = dict()
for idx in self.innerbound_spoke_indices:
self.innerbound_receive_buffers[idx] = communicator_array(
self.remote_lengths[idx - 1]
)
def initialize_nonants(self):
""" Initialize the buffer for the hub to send nonants
to the appropriate spokes
"""
self.nonant_send_buffer = None
for idx in self.nonant_spoke_indices:
if self.nonant_send_buffer is None:
# for hub outer/inner bounds and kill signal
self.nonant_send_buffer = communicator_array(self.local_lengths[idx - 1])
elif self.local_lengths[idx - 1] + 1 != len(self.nonant_send_buffer):
raise RuntimeError("Nonant buffers disagree on size")
def initialize_boundsout(self):
""" Initialize the buffer for the hub to send bounds
to bounds only spokes
"""
self.boundsout_send_buffer = None
for idx in self.bounds_only_indices:
if self.boundsout_send_buffer is None:
self.boundsout_send_buffer = communicator_array(self.local_lengths[idx - 1])
if self.local_lengths[idx - 1] != 2:
raise RuntimeError(f'bounds only local length buffers must be 2 (bounds). Currently {self.local_lengths[idx - 1]}')
def _populate_boundsout_cache(self, buf):
""" Populate a given buffer with the current bounds
"""
buf[-3] = self.BestOuterBound
buf[-2] = self.BestInnerBound
def send_boundsout(self):
""" Send bounds to the appropriate spokes
This is called only for spokes which are bounds only.
w and nonant spokes are passed bounds through the w and nonant buffers
"""
self._populate_boundsout_cache(self.boundsout_send_buffer)
logging.debug("hub is sending bounds={}".format(self.boundsout_send_buffer))
for idx in self.bounds_only_indices:
self.hub_to_spoke(self.boundsout_send_buffer, idx)
def initialize_spoke_indices(self):
""" Figure out what types of spokes we have,
and sort them into the appropriate classes.
Note:
Some spokes may be multiple types (e.g. outerbound and nonant),
though not all combinations are supported.
"""
self.outerbound_spoke_indices = set()
self.innerbound_spoke_indices = set()
self.nonant_spoke_indices = set()
self.w_spoke_indices = set()
self.outerbound_spoke_chars = dict()
self.innerbound_spoke_chars = dict()
for (i, spoke) in enumerate(self.spokes):
spoke_class = spoke["spoke_class"]
if hasattr(spoke_class, "converger_spoke_types"):
for cst in spoke_class.converger_spoke_types:
if cst == ConvergerSpokeType.OUTER_BOUND:
self.outerbound_spoke_indices.add(i + 1)
self.outerbound_spoke_chars[i+1] = spoke_class.converger_spoke_char
elif cst == ConvergerSpokeType.INNER_BOUND:
self.innerbound_spoke_indices.add(i + 1)
self.innerbound_spoke_chars[i+1] = spoke_class.converger_spoke_char
elif cst == ConvergerSpokeType.W_GETTER:
self.w_spoke_indices.add(i + 1)
elif cst == ConvergerSpokeType.NONANT_GETTER:
self.nonant_spoke_indices.add(i + 1)
else:
raise RuntimeError(f"Unrecognized converger_spoke_type {cst}")
else: ##this isn't necessarily wrong, i.e., cut generators
logger.debug(f"Spoke class {spoke_class} not recognized by hub")
# all _BoundSpoke spokes get hub bounds so we determine which spokes
# are "bounds only"
self.bounds_only_indices = \
(self.outerbound_spoke_indices | self.innerbound_spoke_indices) - \
(self.w_spoke_indices | self.nonant_spoke_indices)
self.has_outerbound_spokes = len(self.outerbound_spoke_indices) > 0
self.has_innerbound_spokes = len(self.innerbound_spoke_indices) > 0
self.has_nonant_spokes = len(self.nonant_spoke_indices) > 0
self.has_w_spokes = len(self.w_spoke_indices) > 0
self.has_bounds_only_spokes = len(self.bounds_only_indices) > 0
# Not all opt classes may have extensions
if getattr(self.opt, "extensions", None) is not None:
self.opt.extobject.initialize_spoke_indices()
def make_windows(self):
if self._windows_constructed:
# different parts of the hub may call make_windows,
# we just care about the first call
return
# Spokes notify the hub of the buffer sizes
for i in range(self.n_spokes):
pair_of_sizes = np.zeros(2, dtype="i")
self.strata_comm.Recv((pair_of_sizes, MPI.INT), source=i + 1, tag=i + 1)
self.remote_lengths[i] = pair_of_sizes[0]
self.local_lengths[i] = pair_of_sizes[1]
# Make the windows of the appropriate buffer sizes
self.windows = [None for _ in range(self.n_spokes)]
self.buffers = [None for _ in range(self.n_spokes)]
for i in range(self.n_spokes):
length = self.local_lengths[i]
win, buff = self._make_window(length)
self.windows[i] = win
self.buffers[i] = buff
# flag this for multiple calls from the hub
self._windows_constructed = True
def hub_to_spoke(self, values, spoke_strata_rank):
""" Put the specified values into the specified locally-owned buffer
for the spoke to pick up.
Notes:
This automatically does the -1 indexing
This assumes that values contains a slot at the end for the
write_id
"""
expected_length = self.local_lengths[spoke_strata_rank - 1] + 1
if len(values) != expected_length:
raise RuntimeError(
f"Attempting to put array of length {len(values)} "
f"into local buffer of length {expected_length}"
)
# this is so the spoke ranks all get the same write_id at approximately the same time
if not isinstance(self.opt, APH):
self.cylinder_comm.Barrier()
self.local_write_ids[spoke_strata_rank - 1] += 1
values[-1] = self.local_write_ids[spoke_strata_rank - 1]
window = self.windows[spoke_strata_rank - 1]
window.Lock(self.strata_rank)
window.Put((values, len(values), MPI.DOUBLE), self.strata_rank)
window.Unlock(self.strata_rank)
def hub_from_spoke(self, values, spoke_num):
""" spoke_num is the rank in the strata_comm, so it is 1-based not 0-based
Returns:
is_new (bool): Indicates whether the "gotten" values are new,
based on the write_id.
"""
expected_length = self.remote_lengths[spoke_num - 1] + 1
if len(values) != expected_length:
raise RuntimeError(
f"Hub trying to get buffer of length {expected_length} "
f"from spoke, but provided buffer has length {len(values)}."
)
# so the window in each rank gets read at approximately the same time,
# and so has the same write_id
if not isinstance(self.opt, APH):
self.cylinder_comm.Barrier()
window = self.windows[spoke_num - 1]
window.Lock(spoke_num)
window.Get((values, len(values), MPI.DOUBLE), spoke_num)
window.Unlock(spoke_num)
if isinstance(self.opt, APH):
# reverting part of changes from Ben getting rid of spoke sleep DLW jan 2023
if values[-1] > self.remote_write_ids[spoke_num - 1]:
self.remote_write_ids[spoke_num - 1] = values[-1]
return True
else:
new_id = int(values[-1])
local_val = np.array((new_id,), 'i')
sum_ids = np.zeros(1, 'i')
self.cylinder_comm.Allreduce((local_val, MPI.INT),
(sum_ids, MPI.INT),
op=MPI.SUM)
if new_id != sum_ids[0] / self.cylinder_comm.size:
return False
if (new_id > self.remote_write_ids[spoke_num - 1]) or (new_id < 0):
self.remote_write_ids[spoke_num - 1] = new_id
return True
return False
def send_terminate(self):
""" Send an array of zeros with a -1 appended to the
end to indicate termination. This function puts to the local
buffer, so every spoke will see it simultaneously.
processes (don't need to call them one at a time).
"""
for rank in range(1, self.n_spokes + 1):
dummies = np.zeros(self.local_lengths[rank - 1] + 1)
dummies[-1] = -1
window = self.windows[rank - 1]
window.Lock(0)
window.Put((dummies, len(dummies), MPI.DOUBLE), 0)
window.Unlock(0)
class PHHub(Hub):
def setup_hub(self):
""" Must be called after make_windows(), so that
the hub knows the sizes of all the spokes windows
"""
if not self._windows_constructed:
raise RuntimeError(
"Cannot call setup_hub before memory windows are constructed"
)
self.initialize_spoke_indices()
self.initialize_bound_values()
if self.has_outerbound_spokes:
self.initialize_outer_bound_buffers()
if self.has_innerbound_spokes:
self.initialize_inner_bound_buffers()
if self.has_w_spokes:
self.initialize_ws()
if self.has_nonant_spokes:
self.initialize_nonants()
if self.has_bounds_only_spokes:
self.initialize_boundsout() # bounds going out
## Do some checking for things we currently don't support
if len(self.outerbound_spoke_indices & self.innerbound_spoke_indices) > 0:
raise RuntimeError(
"A Spoke providing both inner and outer "
"bounds is currently unsupported"
)
if len(self.w_spoke_indices & self.nonant_spoke_indices) > 0:
raise RuntimeError(
"A Spoke needing both Ws and nonants is currently unsupported"
)
## Generate some warnings if nothing is giving bounds
if not self.has_outerbound_spokes:
logger.warn(
"No OuterBound Spokes defined, this converger "
"will not cause the hub to terminate"
)
if not self.has_innerbound_spokes:
logger.warn(
"No InnerBound Spokes defined, this converger "
"will not cause the hub to terminate"
)
if self.opt.extensions is not None:
self.opt.extobject.setup_hub()
def sync(self):
"""
Manages communication with Spokes
"""
if self.has_w_spokes:
self.send_ws()
if self.has_nonant_spokes:
self.send_nonants()
if self.has_bounds_only_spokes:
self.send_boundsout()
if self.has_outerbound_spokes:
self.receive_outerbounds()
if self.has_innerbound_spokes:
self.receive_innerbounds()
if self.opt.extensions is not None:
self.opt.extobject.sync_with_spokes()
def sync_with_spokes(self):
self.sync()
def is_converged(self):
if self.opt.best_bound_obj_val is not None:
self.BestOuterBound = self.OuterBoundUpdate(self.opt.best_bound_obj_val)
if not self.has_innerbound_spokes:
if self.opt._PHIter == 1:
logger.warning(
"PHHub cannot compute convergence without "
"inner bound spokes."
)
## you still want to output status, even without inner bounders configured
if self.global_rank == 0:
self.screen_trace()
return False
if not self.has_outerbound_spokes:
if self.opt._PHIter == 1 and not isinstance(self.opt, Subgradient):
global_toc(
"Without outer bound spokes, no progress "
"will be made on the Best Bound")
## log some output
if self.global_rank == 0:
self.screen_trace()
return self.determine_termination()
def current_iteration(self):
""" Return the current PH iteration."""
return self.opt._PHIter
def main(self):
""" SPComm gets attached in self.__init__ """
self.opt.ph_main(finalize=False)
def finalize(self):
""" does PH.post_loops, returns Eobj """
Eobj = self.opt.post_loops(self.opt.extensions)
return Eobj
def send_nonants(self):
""" Gather nonants and send them to the appropriate spokes
TODO: Will likely fail with bundling
"""
self.opt._save_nonants()
ci = 0 ## index to self.nonant_send_buffer
nonant_send_buffer = self.nonant_send_buffer
for k, s in self.opt.local_scenarios.items():
for xvar in s._mpisppy_data.nonant_indices.values():
nonant_send_buffer[ci] = xvar._value
ci += 1
logging.debug("hub is sending X nonants={}".format(nonant_send_buffer))
self._populate_boundsout_cache(nonant_send_buffer)
for idx in self.nonant_spoke_indices:
self.hub_to_spoke(nonant_send_buffer, idx)
def initialize_ws(self):
""" Initialize the buffer for the hub to send dual weights
to the appropriate spokes
"""
self.w_send_buffer = None
for idx in self.w_spoke_indices:
if self.w_send_buffer is None:
self.w_send_buffer = communicator_array(self.local_lengths[idx - 1])
elif self.local_lengths[idx - 1] + 1 != len(self.w_send_buffer):
raise RuntimeError("W buffers disagree on size")
def send_ws(self):
""" Send dual weights to the appropriate spokes
"""
self.opt._populate_W_cache(self.w_send_buffer, padding=3)
logging.debug("hub is sending Ws={}".format(self.w_send_buffer))
self._populate_boundsout_cache(self.w_send_buffer)
for idx in self.w_spoke_indices:
self.hub_to_spoke(self.w_send_buffer, idx)
class LShapedHub(Hub):
def setup_hub(self):
""" Must be called after make_windows(), so that
the hub knows the sizes of all the spokes windows
"""
if not self._windows_constructed:
raise RuntimeError(
"Cannot call setup_hub before memory windows are constructed"
)
self.initialize_spoke_indices()
self.initialize_bound_values()
if self.has_outerbound_spokes:
self.initialize_outer_bound_buffers()
if self.has_innerbound_spokes:
self.initialize_inner_bound_buffers()
## Do some checking for things we currently
## do not support
if self.has_w_spokes:
raise RuntimeError("LShaped hub does not compute dual weights (Ws)")
if self.has_nonant_spokes:
self.initialize_nonants()
if len(self.outerbound_spoke_indices & self.innerbound_spoke_indices) > 0:
raise RuntimeError(
"A Spoke providing both inner and outer "
"bounds is currently unsupported"
)
## Generate some warnings if nothing is giving bounds
if not self.has_innerbound_spokes:
logger.warn(
"No InnerBound Spokes defined, this converger "
"will not cause the hub to terminate"
)
def sync(self, send_nonants=True):
"""
Manages communication with Bound Spokes
"""
if send_nonants and self.has_nonant_spokes:
self.send_nonants()
if self.has_outerbound_spokes:
self.receive_outerbounds()
if self.has_innerbound_spokes:
self.receive_innerbounds()
# in case LShaped ever gets extensions
if getattr(self.opt, "extensions", None) is not None:
self.opt.extobject.sync_with_spokes()
def is_converged(self):
""" Returns a boolean. If True, then LShaped will terminate
Side-effects:
The L-shaped method produces outer bounds during execution,
so we will check it as well.
"""
bound = self.opt._LShaped_bound
self.BestOuterBound = self.OuterBoundUpdate(bound)
## log some output
if self.global_rank == 0:
self.screen_trace()
return self.determine_termination()
def current_iteration(self):
""" Return the current L-shaped iteration."""
return self.opt.iter
def main(self):
""" SPComm gets attached in self.__init__ """
self.opt.lshaped_algorithm()
def send_nonants(self):
""" Gather nonants and send them to the appropriate spokes
TODO: Will likely fail with bundling
"""
ci = 0 ## index to self.nonant_send_buffer
nonant_send_buffer = self.nonant_send_buffer
for k, s in self.opt.local_scenarios.items():
nonant_to_root_var_map = s._mpisppy_model.subproblem_to_root_vars_map
for xvar in s._mpisppy_data.nonant_indices.values():
## Grab the value from the associated root variable
nonant_send_buffer[ci] = nonant_to_root_var_map[xvar]._value
ci += 1
logging.debug("hub is sending X nonants={}".format(nonant_send_buffer))
self._populate_boundsout_cache(nonant_send_buffer)
for idx in self.nonant_spoke_indices:
self.hub_to_spoke(nonant_send_buffer, idx)
class APHHub(PHHub):
def main(self):
""" SPComm gets attached by self.__init___; holding APH harmless """
logger.critical("aph debug main in hub.py")
self.opt.APH_main(spcomm=self, finalize=False)
def finalize(self):
""" does PH.post_loops, returns Eobj """
# NOTE: APH_main does NOT pass in extensions
# to APH.post_loops
Eobj = self.opt.post_loops()
return Eobj