Skip to content

Commit 7fcc80c

Browse files
committed
#566: eliminated "distributions" that are no longer needed
1 parent e89be59 commit 7fcc80c

File tree

8 files changed

+43
-76
lines changed

8 files changed

+43
-76
lines changed

config/conf.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ check_schema: false
99
work_model:
1010
name: AffineCombination
1111
parameters:
12-
alpha: 0.0
13-
beta: 1.0
12+
alpha: 1.0
13+
beta: 0.0
1414
gamma: 0.0
1515

1616
# Specify algorithm

src/lbaf/Applications/LBAF_app.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,8 @@ def run(self, cfg=None, cfg_dir=None):
587587
# Execute runtime for specified phases
588588
offline_LB_compatible = self.__parameters.json_params.get(
589589
"offline_LB_compatible", False)
590+
lb_iterations = self.__parameters.json_params.get(
591+
"lb_iterations", False)
590592
rebalanced_phase = runtime.execute(
591593
self.__parameters.algorithm.get("phase_id", 0),
592594
1 if offline_LB_compatible else 0)
@@ -620,7 +622,7 @@ def run(self, cfg=None, cfg_dir=None):
620622
else:
621623
# Add new phase when load balancing when offline mode not selected
622624
self.__logger.info(f"Creating rebalanced phase {phase_id}")
623-
self.__jsoTn_writer.write(
625+
self.__json_writer.write(
624626
{phase_id: rebalanced_phase})
625627

626628
# Generate meshes and multimedia when requested

src/lbaf/Execution/lbsAlgorithmBase.py

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -154,40 +154,8 @@ def factory(
154154
logger.error(f"Could not create an algorithm with name {algorithm_name}")
155155
raise SystemExit(1) from e
156156

157-
def _update_distributions_and_statistics(self, distributions: dict, statistics: dict):
158-
"""Compute and update run distributions and statistics."""
159-
# Create or update distributions of object quantities of interest
160-
for object_qoi_name in tuple({"load", self.__object_qoi}):
161-
if not object_qoi_name:
162-
continue
163-
try:
164-
distributions.setdefault(f"object {object_qoi_name}", []).append(
165-
{o.get_id(): getattr(o, f"get_{object_qoi_name}")()
166-
for o in self._rebalanced_phase.get_objects()})
167-
except AttributeError as err:
168-
self.__print_QOI("obj")
169-
self._logger.error(f"Invalid object_qoi name '{object_qoi_name}'")
170-
raise SystemExit(1) from err
171-
172-
# Create or update distributions of rank quantities of interest
173-
for rank_qoi_name in tuple({"objects", "load", self.__rank_qoi}):
174-
if not rank_qoi_name or rank_qoi_name == "work":
175-
continue
176-
try:
177-
distributions.setdefault(f"rank {rank_qoi_name}", []).append(
178-
[getattr(p, f"get_{rank_qoi_name}")()
179-
for p in self._rebalanced_phase.get_ranks()])
180-
except AttributeError as err:
181-
self.__print_QOI("rank")
182-
self._logger.error(f"Invalid rank_qoi name '{rank_qoi_name}'")
183-
raise SystemExit(1) from err
184-
distributions.setdefault("rank work", []).append(
185-
[self._work_model.compute(p) for p in self._rebalanced_phase.get_ranks()])
186-
187-
# Create or update distributions of edge quantities of interest
188-
distributions.setdefault("sent", []).append(dict(
189-
self._rebalanced_phase.get_edge_maxima().items()))
190-
157+
def _update_statistics(self, statistics: dict):
158+
"""Compute and update run statistics."""
191159
# Create or update statistics dictionary entries
192160
for (support, getter), stat_names in self.__statistics.items():
193161
for k, v in stat_names.items():
@@ -253,7 +221,7 @@ def _report_final_mapping(self, logger):
253221
logger.debug(
254222
f"object {k.get_id()} on rank {k.get_rank_id()}: {v}")
255223

256-
def _initialize(self, p_id, phases, distributions, statistics):
224+
def _initialize(self, p_id, phases, statistics):
257225
"""Factor out pre-execution checks and initalizations."""
258226
# Ensure that a list with at least one phase was provided
259227
if not isinstance(phases, dict) or not all(
@@ -286,16 +254,15 @@ def _initialize(self, p_id, phases, distributions, statistics):
286254
f"across {self._rebalanced_phase.get_number_of_ranks()} ranks "
287255
f"into phase {self._rebalanced_phase.get_id()}")
288256

289-
# Initialize run distributions and statistics
290-
self._update_distributions_and_statistics(distributions, statistics)
257+
# Initialize run statistics
258+
self._update_statistics(statistics)
291259

292260
@abc.abstractmethod
293-
def execute(self, p_id, phases, distributions, statistics, a_min_max):
261+
def execute(self, p_id, phases, statistics, a_min_max):
294262
"""Execute balancing algorithm on Phase instance.
295263
296264
:param: p_id: index of phase to be rebalanced (all if equal to _)
297265
:param: phases: list of Phase instances
298-
:param: distributions: dictionary of load-varying variables
299266
:param: statistics: dictionary of statistics
300267
:param: a_min_max: possibly empty list of optimal arrangements.
301268
"""

src/lbaf/Execution/lbsBruteForceAlgorithm.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ def __init__(self, work_model, parameters: dict, lgr: Logger, rank_qoi: str, obj
6767
self._logger.info(
6868
f"Instantiated {'with' if self.__skip_transfer else 'without'} transfer stage skipping")
6969

70-
def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, _):
70+
def execute(self, p_id: int, phases: list, statistics: dict, _):
7171
"""Execute brute force optimization algorithm on phase with index p_id."""
7272
# Perform pre-execution checks and initializations
73-
self._initialize(p_id, phases, distributions, statistics)
73+
self._initialize(p_id, phases, statistics)
7474
self._logger.info("Starting brute force optimization")
7575
initial_phase = phases[min(phases.keys())]
7676
phase_ranks = initial_phase.get_ranks()
@@ -113,8 +113,8 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
113113
# Report on object transfers
114114
self._logger.info(f"{n_transfers} transfers occurred")
115115

116-
# Update run distributions and statistics
117-
self._update_distributions_and_statistics(distributions, statistics)
116+
# Update run statistics
117+
self._update_statistics(statistics)
118118

119119
# Report final mapping in debug mode
120120
self._report_final_mapping(self._logger)

src/lbaf/Execution/lbsCentralizedPrefixOptimizerAlgorithm.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,18 @@ def __init__(self, work_model, parameters: dict, lgr: Logger, qoi_name: str, obj
6363
self._phase = None
6464
self._max_shared_ids = None
6565

66-
def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, _):
66+
def execute(self, p_id: int, phases: list, statistics: dict, _):
6767
""" Execute centralized prefix memory-constrained optimizer"""
6868

6969
p_id = 0
7070

7171
# Ensure that a list with at least one phase was provided
72-
self._initialize(p_id, phases, distributions, statistics)
72+
self._initialize(p_id, phases, statistics)
7373

7474
self._phase = self._rebalanced_phase
7575

76-
# Initialize run distributions and statistics
77-
self._update_distributions_and_statistics(distributions, statistics)
76+
# Initialize run statistics
77+
self._update_statistics(statistics)
7878

7979
# Prepare input data for rank order enumerator
8080
self._logger.info("Starting optimizer")
@@ -192,8 +192,8 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
192192
f"iteration {i + 1} rank work",
193193
self._logger)
194194

195-
# Update run distributions and statistics
196-
self._update_distributions_and_statistics(distributions, statistics)
195+
# Update run statistics
196+
self._update_statistics(statistics)
197197

198198
# Report final mapping in debug mode
199199
self._report_final_mapping(self._logger)

src/lbaf/Execution/lbsInformAndTransferAlgorithm.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,10 @@ def __execute_information_stage(self):
218218
self._logger.info(
219219
f"Average number of peers known to ranks: {n_k} ({100 * n_k / n_r:.2f}% of {n_r})")
220220

221-
def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, a_min_max):
221+
def execute(self, p_id: int, phases: list, statistics: dict, a_min_max):
222222
""" Execute 2-phase information+transfer algorithm on Phase with index p_id."""
223223
# Perform pre-execution checks and initializations
224-
self._initialize(p_id, phases, distributions, statistics)
224+
self._initialize(p_id, phases, statistics)
225225
print_function_statistics(
226226
self._rebalanced_phase.get_ranks(),
227227
self._work_model.compute,
@@ -244,8 +244,6 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
244244
# Start with information stage
245245
self.__execute_information_stage()
246246

247-
print(f"statistics: {statistics}")
248-
249247
# Execute transfer stage
250248
n_ignored, n_transfers, n_rejects = self.__transfer_strategy.execute(
251249
self.__known_peers, self._rebalanced_phase, statistics["average load"], statistics["maximum load"][-1])
@@ -267,8 +265,8 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
267265
f"iteration {i + 1} rank work",
268266
self._logger)
269267

270-
# Update run distributions and statistics
271-
self._update_distributions_and_statistics(distributions, statistics)
268+
# Update run statistics
269+
self._update_statistics(statistics)
272270

273271
# Compute current arrangement
274272
arrangement = tuple(sorted(

src/lbaf/Execution/lbsPhaseStepperAlgorithm.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def __init__(self, work_model, parameters: dict, lgr: Logger, rank_qoi: str, obj
6060
# Call superclass init
6161
super().__init__(work_model, parameters, lgr, rank_qoi, object_qoi)
6262

63-
def execute(self, _, phases: list, distributions: dict, statistics: dict, __):
63+
def execute(self, _, phases: list, statistics: dict, __):
6464
"""Steps through all phases."""
6565

6666
# Ensure that a list with at least one phase was provided
@@ -81,9 +81,8 @@ def execute(self, _, phases: list, distributions: dict, statistics: dict, __):
8181
f"phase {p_id} rank works",
8282
self._logger)
8383

84-
# Update run distributions and statistics
85-
self._update_distributions_and_statistics(
86-
distributions, statistics)
84+
# Update run statistics
85+
self._update_statistics(statistics)
8786

8887
# Report current mapping in debug mode
8988
self._report_final_mapping(self._logger)

src/lbaf/Execution/lbsRuntime.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,15 @@
5050
class Runtime:
5151
"""A class to handle the execution of the LBS."""
5252

53-
def __init__(self, phases: dict, work_model: dict, algorithm: dict, arrangements: list, logger: Logger,
54-
rank_qoi: str, object_qoi: str):
53+
def __init__(
54+
self,
55+
phases: dict,
56+
work_model: dict,
57+
algorithm: dict,
58+
arrangements: list,
59+
logger: Logger,
60+
rank_qoi: str,
61+
object_qoi: str):
5562
"""Class constructor.
5663
5764
:param phases: dictionary of Phase instances
@@ -96,9 +103,8 @@ def __init__(self, phases: dict, work_model: dict, algorithm: dict, arrangements
96103
f"Could not instantiate an algorithm of type {self.__algorithm}")
97104
raise SystemExit(1)
98105

99-
# Initialize run distributions and statistics
106+
# Initialize run statistics
100107
phase_0 = self.__phases[min(self.__phases.keys())]
101-
self.__distributions = {}
102108
l_stats = compute_function_statistics(
103109
phase_0.get_ranks(),
104110
lambda x: x.get_load())
@@ -122,10 +128,6 @@ def get_work_model(self):
122128
"""Return runtime work model."""
123129
return self.__work_model
124130

125-
def get_distributions(self):
126-
"""Return runtime distributions."""
127-
return self.__distributions
128-
129131
def get_statistics(self):
130132
"""Return runtime statistics."""
131133
return self.__statistics
@@ -139,19 +141,18 @@ def execute(self, p_id: int, phase_increment: int=0):
139141
self.__algorithm.execute(
140142
p_id,
141143
self.__phases,
142-
self.__distributions,
143144
self.__statistics,
144145
self.__a_min_max)
145146

146147
# Retrieve possibly null rebalanced phase and return it
147-
if (pp := self.__algorithm.get_rebalanced_phase()):
148+
if (lbp := self.__algorithm.get_rebalanced_phase()):
148149
# Increment rebalanced phase ID as requested
149-
pp.set_id((pp_id := pp.get_id() + phase_increment))
150+
lbp.set_id((lbp_id := lbp.get_id() + phase_increment))
150151

151152
# Share communications from original phase with new phase
152153
initial_communications = self.__algorithm.get_initial_communications()
153-
pp.set_communications(initial_communications[p_id])
154-
self.__logger.info(f"Created rebalanced phase {pp_id}")
154+
lbp.set_communications(initial_communications[p_id])
155+
self.__logger.info(f"Created rebalanced phase {lbp_id}")
155156

156157
# Return rebalanced phase
157-
return pp
158+
return lbp

0 commit comments

Comments
 (0)