-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathhub.py
450 lines (355 loc) · 15.2 KB
/
hub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
###############################################################################
# mpi-sppy: MPI-based Stochastic Programming in PYthon
#
# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for
# Sustainable Energy, LLC, The Regents of the University of California, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
# full copyright and license information.
###############################################################################
import abc
import logging
import mpisppy.log
from mpisppy.cylinders.spcommunicator import RecvArray, SPCommunicator
from mpisppy import global_toc
from mpisppy.cylinders.spwindow import Field
# Could also pass, e.g., sys.stdout instead of a filename
mpisppy.log.setup_logger(__name__,
"hub.log",
level=logging.CRITICAL)
logger = logging.getLogger(__name__)
class Hub(SPCommunicator):
send_fields = (*SPCommunicator.send_fields, Field.SHUTDOWN, Field.BEST_OBJECTIVE_BOUNDS,)
receive_fields = (*SPCommunicator.receive_fields,)
_hub_algo_best_bound_provider = False
def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, communicators, options=None):
super().__init__(spbase_object, fullcomm, strata_comm, cylinder_comm, communicators, options=options)
logger.debug(f"Built the hub object on global rank {fullcomm.Get_rank()}")
# for logging
self.print_init = True
# for termination based on stalling out
self.stalled_iter_cnt = 0
self.last_gap = float('inf') # abs_gap tracker
return
@abc.abstractmethod
def setup_hub(self):
pass
@abc.abstractmethod
def sync(self):
""" To be called within the whichever optimization algorithm
is being run on the hub (e.g. PH)
"""
pass
@abc.abstractmethod
def is_converged(self):
""" The hub has the ability to halt the optimization algorithm on the
hub before any local convergers.
"""
pass
@abc.abstractmethod
def current_iteration(self):
""" Returns the current iteration count - however the hub defines it.
"""
pass
@abc.abstractmethod
def main(self):
pass
def clear_latest_chars(self):
self.latest_ib_char = None
self.latest_ob_char = None
def compute_gaps(self):
""" Compute the current absolute and relative gaps,
using the current self.BestInnerBound and self.BestOuterBound
"""
if self.opt.is_minimizing:
abs_gap = self.BestInnerBound - self.BestOuterBound
else:
abs_gap = self.BestOuterBound - self.BestInnerBound
## define by the best solution, as is common
nano = float("nan") # typing aid
if (
abs_gap != nano
and abs_gap != float("inf")
and abs_gap != float("-inf")
and self.BestOuterBound != nano
and self.BestOuterBound != 0
):
rel_gap = abs_gap / abs(self.BestOuterBound)
else:
rel_gap = float("inf")
return abs_gap, rel_gap
def get_update_string(self):
if self.latest_ib_char is None and \
self.latest_ob_char is None:
return ' '
if self.latest_ib_char is None:
return self.latest_ob_char + ' '
if self.latest_ob_char is None:
return ' ' + self.latest_ib_char
return self.latest_ob_char+' '+self.latest_ib_char
def screen_trace(self):
current_iteration = self.current_iteration()
abs_gap, rel_gap = self.compute_gaps()
best_solution = self.BestInnerBound
best_bound = self.BestOuterBound
update_source = self.get_update_string()
if self.print_init:
row = f'{"Iter.":>5s} {" "} {"Best Bound":>14s} {"Best Incumbent":>14s} {"Rel. Gap":>12s} {"Abs. Gap":>14s}'
global_toc(row, True)
self.print_init = False
row = f"{current_iteration:5d} {update_source} {best_bound:14.4f} {best_solution:14.4f} {rel_gap*100:12.3f}% {abs_gap:14.4f}"
global_toc(row, True)
self.clear_latest_chars()
def determine_termination(self):
# return True if termination is indicated, otherwise return False
if not hasattr(self,"options") or self.options is None\
or ("rel_gap" not in self.options and "abs_gap" not in self.options\
and "max_stalled_iters" not in self.options):
return False # Nothing to see here folks...
# If we are still here, there is some option for termination
abs_gap, rel_gap = self.compute_gaps()
abs_gap_satisfied = False
rel_gap_satisfied = False
max_stalled_satisfied = False
if "rel_gap" in self.options and rel_gap <= self.options["rel_gap"]:
rel_gap_satisfied = True
if "abs_gap" in self.options and abs_gap <= self.options["abs_gap"]:
abs_gap_satisfied = True
if "max_stalled_iters" in self.options:
if abs_gap < self.last_gap: # liberal test (we could use an epsilon)
self.last_gap = abs_gap
self.stalled_iter_cnt = 0
else:
self.stalled_iter_cnt += 1
if self.stalled_iter_cnt >= self.options["max_stalled_iters"]:
max_stalled_satisfied = True
if abs_gap_satisfied:
global_toc(f"Terminating based on inter-cylinder absolute gap {abs_gap:12.4f}")
if rel_gap_satisfied:
global_toc(f"Terminating based on inter-cylinder relative gap {rel_gap*100:12.3f}%")
if max_stalled_satisfied:
global_toc(f"Terminating based on max-stalled-iters {self.stalled_iter_cnt}")
return abs_gap_satisfied or rel_gap_satisfied or max_stalled_satisfied
def hub_finalize(self):
self.receive_outerbounds()
self.receive_innerbounds()
if self.global_rank == 0:
self.print_init = True
global_toc("Statistics at termination", True)
self.screen_trace()
def _populate_boundsout_cache(self, buf):
""" Populate a given buffer with the current bounds
"""
buf[0] = self.BestOuterBound
buf[1] = self.BestInnerBound
def send_boundsout(self):
""" Send bounds to the appropriate spokes
"""
my_bounds = self.send_buffers[Field.BEST_OBJECTIVE_BOUNDS]
self._populate_boundsout_cache(my_bounds.array())
logging.debug("hub is sending bounds={}".format(my_bounds))
self.put_send_buffer(my_bounds, Field.BEST_OBJECTIVE_BOUNDS)
return
def register_receive_fields(self):
""" Figure out what types of spokes we have,
and sort them into the appropriate classes.
Note:
Some spokes may be multiple types (e.g. outerbound and nonant),
though not all combinations are supported.
"""
super().register_receive_fields()
# Not all opt classes may have extensions
if getattr(self.opt, "extensions", None) is not None:
self.opt.extobject.register_receive_fields()
return
def register_send_fields(self):
super().register_send_fields()
# Not all opt classes may have extensions
if getattr(self.opt, "extensions", None) is not None:
self.opt.extobject.register_send_fields()
return
def send_terminate(self):
""" Send an array of zeros with a -1 appended to the
end to indicate termination. This function puts to the local
buffer, so every spoke will see it simultaneously.
processes (don't need to call them one at a time).
"""
self.send_buffers[Field.SHUTDOWN][0] = 1.0
self.put_send_buffer(self.send_buffers[Field.SHUTDOWN], Field.SHUTDOWN)
return
def sync_bounds(self):
self.receive_nonant_bounds()
self.receive_outerbounds()
self.receive_innerbounds()
self.send_boundsout()
class PHHub(Hub):
send_fields = (*Hub.send_fields, Field.NONANT, Field.DUALS)
receive_fields = (*Hub.receive_fields,)
def setup_hub(self):
## Generate some warnings if nothing is giving bounds
if not self.receive_field_spcomms[Field.OBJECTIVE_OUTER_BOUND]:
logger.warn(
"No OuterBound Spokes defined, this converger "
"will not cause the hub to terminate"
)
if not self.receive_field_spcomms[Field.OBJECTIVE_INNER_BOUND]:
logger.warn(
"No InnerBound Spokes defined, this converger "
"will not cause the hub to terminate"
)
if self.opt.extensions is not None:
self.opt.extobject.setup_hub()
def sync(self):
"""
Manages communication with Spokes
"""
self.sync_Ws()
self.sync_nonants()
self.sync_bounds()
self.sync_extensions()
def sync_with_spokes(self):
self.sync()
def sync_extensions(self):
if self.opt.extensions is not None:
self.opt.extobject.sync_with_spokes()
def sync_nonants(self):
self.send_nonants()
def sync_Ws(self):
self.send_ws()
def is_converged(self):
if self.opt.best_bound_obj_val is not None:
self.BestOuterBound = self.OuterBoundUpdate(self.opt.best_bound_obj_val)
if self.opt.best_solution_obj_val is not None:
self.BestInnerBound = self.InnerBoundUpdate(self.opt.best_solution_obj_val)
if not self.receive_field_spcomms[Field.OBJECTIVE_INNER_BOUND]:
if self.opt._PHIter == 1:
logger.warning(
"PHHub cannot compute convergence without "
"inner bound spokes."
)
## you still want to output status, even without inner bounders configured
if self.global_rank == 0:
self.screen_trace()
return False
if not self.receive_field_spcomms[Field.OBJECTIVE_OUTER_BOUND]:
if self.opt._PHIter == 1 and not self._hub_algo_best_bound_provider:
global_toc(
"Without outer bound spokes, no progress "
"will be made on the Best Bound")
## log some output
if self.global_rank == 0:
self.screen_trace()
return self.determine_termination()
def current_iteration(self):
""" Return the current PH iteration."""
return self.opt._PHIter
def main(self):
""" SPComm gets attached in self.__init__ """
self.opt.ph_main(finalize=False)
def finalize(self):
""" does PH.post_loops, returns Eobj """
Eobj = self.opt.post_loops(self.opt.extensions)
return Eobj
def send_nonants(self):
""" Gather nonants and send them to the appropriate spokes
"""
ci = 0 ## index to self.nonant_send_buffer
nonant_send_buffer = self.send_buffers[Field.NONANT]
for k, s in self.opt.local_scenarios.items():
for xvar in s._mpisppy_data.nonant_indices.values():
nonant_send_buffer[ci] = xvar._value
ci += 1
logging.debug("hub is sending X nonants={}".format(nonant_send_buffer))
self.put_send_buffer(nonant_send_buffer, Field.NONANT)
return
def send_ws(self):
""" Send dual weights to the appropriate spokes
"""
# NOTE: my_ws.array() and self.w_send_buffer should be the same array.
my_ws = self.send_buffers[Field.DUALS]
self.opt._populate_W_cache(my_ws.array(), padding=1)
logging.debug("hub is sending Ws={}".format(my_ws.array()))
self.put_send_buffer(my_ws, Field.DUALS)
return
class LShapedHub(Hub):
send_fields = (*Hub.send_fields, Field.NONANT,)
receive_fields = (*Hub.receive_fields,)
def setup_hub(self):
## Generate some warnings if nothing is giving bounds
if not self.receive_field_spcomms[Field.OBJECTIVE_INNER_BOUND]:
logger.warn(
"No InnerBound Spokes defined, this converger "
"will not cause the hub to terminate"
)
def sync(self, send_nonants=True):
"""
Manages communication with Bound Spokes
"""
if send_nonants:
self.send_nonants()
self.sync_bounds()
# in case LShaped ever gets extensions
if getattr(self.opt, "extensions", None) is not None:
self.opt.extobject.sync_with_spokes()
def is_converged(self):
""" Returns a boolean. If True, then LShaped will terminate
Side-effects:
The L-shaped method produces outer bounds during execution,
so we will check it as well.
"""
bound = self.opt._LShaped_bound
self.BestOuterBound = self.OuterBoundUpdate(bound)
## log some output
if self.global_rank == 0:
self.screen_trace()
return self.determine_termination()
def current_iteration(self):
""" Return the current L-shaped iteration."""
return self.opt.iter
def main(self):
""" SPComm gets attached in self.__init__ """
self.opt.lshaped_algorithm()
def send_nonants(self):
""" Gather nonants and send them to the appropriate spokes
TODO: Will likely fail with bundling
"""
ci = 0 ## index to self.nonant_send_buffer
nonant_send_buffer = self.send_buffers[Field.NONANT]
for k, s in self.opt.local_scenarios.items():
nonant_to_root_var_map = s._mpisppy_model.subproblem_to_root_vars_map
for xvar in s._mpisppy_data.nonant_indices.values():
## Grab the value from the associated root variable
nonant_send_buffer[ci] = nonant_to_root_var_map[xvar]._value
ci += 1
logging.debug("hub is sending X nonants={}".format(nonant_send_buffer))
self.put_send_buffer(nonant_send_buffer, Field.NONANT)
return
class SubgradientHub(PHHub):
# send / receive fields are same as PHHub
_hub_algo_best_bound_provider = True
def main(self):
""" SPComm gets attached in self.__init__ """
self.opt.subgradient_main(finalize=False)
class APHHub(PHHub):
# send / receive fields are same as PHHub
def main(self):
""" SPComm gets attached by self.__init___; holding APH harmless """
logger.critical("aph debug main in hub.py")
self.opt.APH_main(spcomm=self, finalize=False)
# overwrite the default behavior of this method for APH
def get_receive_buffer(self,
buf: RecvArray,
field: Field,
origin: int,
synchronize: bool = False,
):
return super().get_receive_buffer(buf, field, origin, synchronize)
def finalize(self):
""" does PH.post_loops, returns Eobj """
# NOTE: APH_main does NOT pass in extensions
# to APH.post_loops
Eobj = self.opt.post_loops()
return Eobj
class FWPHHub(PHHub):
_hub_algo_best_bound_provider = True
def main(self):
self.opt.fwph_main(finalize=False)