Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions config/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ check_schema: false
work_model:
name: AffineCombination
parameters:
alpha: 0.0
beta: 1.0
alpha: 1.0
beta: 0.0
gamma: 0.0

# Specify algorithm
brute_force_optimization: true
algorithm:
name: InformAndTransfer
phase_id: 0
Expand All @@ -35,14 +36,4 @@ write_JSON:
suffix: json
communications: true
offline_LB_compatible: false
# visualization:
# x_ranks: 2
# y_ranks: 2
# z_ranks: 1
# object_jitter: 0.5
# rank_qoi: load
# object_qoi: load
# save_meshes: true
# force_continuous_object_qoi: true
# output_visualization_dir: ../output
# output_visualization_file_stem: output_file
lb_iterations: true
7 changes: 4 additions & 3 deletions config/synthetic-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ work_model:
gamma: 0.0

# Specify algorithm
brute_force_optimization: true
algorithm:
name: InformAndTransfer
phase_id: 0
Expand Down Expand Up @@ -43,7 +44,7 @@ visualization:
output_visualization_file_stem: output_file

write_JSON:
compressed: False
compressed: false
suffix: json
communications: True
offline_LB_compatible: True
communications: true
offline_LB_compatible: true
33 changes: 16 additions & 17 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,12 @@
raise SystemExit(1) from e

# Retrieve optional parameters
self.json_params[
"json_output_suffix"] = wrt_json.get("suffix", "json")
self.json_params[
"communications"] = wrt_json.get("communications", False)
self.json_params[
"offline_LB_compatible"] = wrt_json.get(
"offline_LB_compatible", False)
for k_out, k_wrt, v_def in [
("json_output_suffix", "suffix", "json"),
("communications", "communications", False),
("offline_LB_compatible", "offline_LB_compatible", False),
("lb_iterations", "lb_iterations", False)]:
self.json_params[k_out] = wrt_json.get(k_wrt, v_def)

def check_parameters(self):
"""Checks after initialization."""
Expand Down Expand Up @@ -412,7 +411,7 @@
# Return rank load and work statistics
return l_stats, w_stats

def __print_QOI(self) -> int: # pylint:disable=C0103:invalid-name
def __print_qoi(self) -> int:
"""Print list of implemented QOI based on the '-verbosity' command line argument."""
verbosity = int(self.__args.verbose)

Expand Down Expand Up @@ -463,7 +462,7 @@
self.__parse_args()

# Print list of implemented QOI (according to verbosity argument)
self.__print_QOI()
self.__print_qoi()

# Warn if default configuration is used because not set as argument
if self.__args.configuration is None:
Expand Down Expand Up @@ -568,8 +567,7 @@
objects = initial_phase.get_objects()
alpha, beta, gamma = [
self.__parameters.work_model.get("parameters", {}).get(k)
for k in ("alpha", "beta", "gamma")
]
for k in ("alpha", "beta", "gamma")]
_n_a, _w_min_max, a_min_max = lbstats.compute_min_max_arrangements_work(
objects, alpha, beta, gamma, n_ranks, logger=self.__logger)
else:
Expand All @@ -581,16 +579,16 @@
self.__parameters.work_model,
self.__parameters.algorithm,
a_min_max,
self.__logger,
self.__parameters.rank_qoi if self.__parameters.rank_qoi is not None else '',
self.__parameters.object_qoi if self.__parameters.object_qoi is not None else '')
self.__logger)

# Execute runtime for specified phases
offline_LB_compatible = self.__parameters.json_params.get( # pylint:disable=C0103:invalid-name;not lowercase
offline_LB_compatible = self.__parameters.json_params.get(
"offline_LB_compatible", False)
lb_iterations = self.__parameters.json_params.get(

Check warning on line 587 in src/lbaf/Applications/LBAF_app.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused variable 'lb_iterations' (unused-variable)
"lb_iterations", False)
rebalanced_phase = runtime.execute(
self.__parameters.algorithm.get("phase_id", 0),
offline_LB_compatible)
1 if offline_LB_compatible else 0)

# Instantiate phase to VT file writer when requested
if self.__json_writer:
Expand Down Expand Up @@ -619,7 +617,8 @@
f"Writing all ({len(phases)}) phases for offline load-balancing")
self.__json_writer.write(phases)
else:
self.__logger.info(f"Writing single phase {phase_id} to JSON files")
# Add new phase when load balancing when offline mode not selected
self.__logger.info(f"Creating rebalanced phase {phase_id}")
self.__json_writer.write(
{phase_id: rebalanced_phase})

Expand Down
123 changes: 14 additions & 109 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
#@HEADER
#
import abc
import os

Check warning on line 44 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused import os (unused-import)
from typing import Set

from ..IO.lbsStatistics import compute_function_statistics
Expand All @@ -58,13 +58,11 @@
_work_model: WorkModelBase
_logger: Logger

def __init__(self, work_model: WorkModelBase, parameters: dict, logger: Logger, rank_qoi: str, object_qoi: str):
def __init__(self, work_model: WorkModelBase, parameters: dict, logger: Logger):
"""Class constructor.

:param work_model: a WorkModelBase instance
:param parameters: a dictionary of parameters
:param rank_qoi: rank QOI to track
:param object_qoi: object QOI to track.
"""
# Assert that a logger instance was passed
if not isinstance(logger, Logger):
Expand All @@ -83,40 +81,18 @@
self._logger.error("Could not create an algorithm without a dictionary of parameters")
raise SystemExit(1)

# Assert that quantity of interest names are string
if rank_qoi and not isinstance(rank_qoi, str):
self._logger.error("Could not create an algorithm with non-string rank QOI name")
raise SystemExit(1)
self.__rank_qoi = rank_qoi
if object_qoi and not isinstance(object_qoi, str):
self._logger.error("Could not create an algorithm with non-string object QOI name")
raise SystemExit(1)
self.__object_qoi = object_qoi
self._logger.info(
f"Created base algorithm tracking rank {rank_qoi} and object {object_qoi}")

# Initially no phase is assigned for processing
self._rebalanced_phase = None

# Save the initial communications data
self._initial_communications = {}

# Map global statistical QOIs to their computation methods
# Map rank statistics to their respective computation methods
self.__statistics = {
("ranks", lambda x: x.get_load()): {
"minimum load": "minimum",
"maximum load": "maximum",
"load variance": "variance",
"load imbalance": "imbalance"},
("largest_volumes", lambda x: x): {
"number of communication edges": "cardinality",
"maximum largest directed volume": "maximum",
"total largest directed volume": "sum"},
("ranks", lambda x: self._work_model.compute(x)): { #pylint:disable=W0108
"minimum work": "minimum",
"maximum work": "maximum",
"total work": "sum",
"work variance": "variance"}}
"maximum load": "maximum"},
("ranks", lambda x: self._work_model.compute(x)): {

Check warning on line 94 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Lambda may not be necessary (unnecessary-lambda)
"total work": "sum"}}

def get_rebalanced_phase(self):
"""Return phased assigned for processing by algoritm."""
Expand All @@ -131,9 +107,7 @@
algorithm_name:str,
parameters: dict,
work_model: WorkModelBase,
logger: Logger,
rank_qoi: str,
object_qoi:str):
logger: Logger):
"""Instantiate the necessary concrete algorithm."""
# Load up available algorithms
# pylint:disable=W0641:possibly-unused-variable,C0415:import-outside-toplevel
Expand All @@ -148,90 +122,22 @@
try:
# Instantiate and return object
algorithm = locals()[algorithm_name + "Algorithm"]
return algorithm(work_model, parameters, logger, rank_qoi, object_qoi)
return algorithm(work_model, parameters, logger)
except Exception as e:
# Otherwise, error out
logger.error(f"Could not create an algorithm with name {algorithm_name}")
raise SystemExit(1) from e

def _update_distributions_and_statistics(self, distributions: dict, statistics: dict):
"""Compute and update run distributions and statistics."""
# Create or update distributions of object quantities of interest
for object_qoi_name in tuple({"load", self.__object_qoi}):
if not object_qoi_name:
continue
try:
distributions.setdefault(f"object {object_qoi_name}", []).append(
{o.get_id(): getattr(o, f"get_{object_qoi_name}")()
for o in self._rebalanced_phase.get_objects()})
except AttributeError as err:
self.__print_QOI("obj")
self._logger.error(f"Invalid object_qoi name '{object_qoi_name}'")
raise SystemExit(1) from err

# Create or update distributions of rank quantities of interest
for rank_qoi_name in tuple({"objects", "load", self.__rank_qoi}):
if not rank_qoi_name or rank_qoi_name == "work":
continue
try:
distributions.setdefault(f"rank {rank_qoi_name}", []).append(
[getattr(p, f"get_{rank_qoi_name}")()
for p in self._rebalanced_phase.get_ranks()])
except AttributeError as err:
self.__print_QOI("rank")
self._logger.error(f"Invalid rank_qoi name '{rank_qoi_name}'")
raise SystemExit(1) from err
distributions.setdefault("rank work", []).append(
[self._work_model.compute(p) for p in self._rebalanced_phase.get_ranks()])

# Create or update distributions of edge quantities of interest
distributions.setdefault("sent", []).append(dict(
self._rebalanced_phase.get_edge_maxima().items()))

def _update_statistics(self, statistics: dict):
"""Compute and update run statistics."""
# Create or update statistics dictionary entries
for (support, getter), stat_names in self.__statistics.items():
for k, v in stat_names.items():
self._logger.debug(f"Updating {k} statistics for {support}")
stats = compute_function_statistics(
getattr(self._rebalanced_phase, f"get_{support}")(), getter)
statistics.setdefault(k, []).append(getattr(stats, f"get_{v}")())

def __print_QOI(self,rank_or_obj): # pylint:disable=invalid-name
"""Print list of implemented QOI when invalid QOI is given."""
# Initialize file paths
current_path = os.path.abspath(__file__)
target_dir = os.path.join(
os.path.dirname(os.path.dirname(current_path)), "Model")
rank_script_name = "lbsRank.py"
object_script_name = "lbsObject.py"

if rank_or_obj == "rank":
# Create list of all Rank QOI (lbsRank.get_*)
r_qoi_list = ["work"]
with open(os.path.join(target_dir, rank_script_name), 'r', encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
if line[8:12] == "get_":
r_qoi_list.append(line[12:line.find("(")])

# Print QOI based on verbosity level
self._logger.error("List of all possible Rank QOI:")
for r_qoi in r_qoi_list:
self._logger.error("\t" + r_qoi)

if rank_or_obj == "obj":
# Create list of all Object QOI (lbsObject.get_*)
o_qoi_list = []
with open(os.path.join(target_dir, object_script_name), 'r', encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
if line[8:12] == "get_":
o_qoi_list.append(line[12:line.find("(")])

# Print QOI based on verbosity level
self._logger.error("List of all possible Object QOI:")
for o_qoi in o_qoi_list:
self._logger.error("\t" + o_qoi)

def _report_final_mapping(self, logger):
"""Report final rank object mapping in debug mode."""
for rank in self._rebalanced_phase.get_ranks():
Expand All @@ -253,7 +159,7 @@
logger.debug(
f"object {k.get_id()} on rank {k.get_rank_id()}: {v}")

def _initialize(self, p_id, phases, distributions, statistics):
def _initialize(self, p_id, phases, statistics):
"""Factor out pre-execution checks and initalizations."""
# Ensure that a list with at least one phase was provided
if not isinstance(phases, dict) or not all(
Expand Down Expand Up @@ -286,16 +192,15 @@
f"across {self._rebalanced_phase.get_number_of_ranks()} ranks "
f"into phase {self._rebalanced_phase.get_id()}")

# Initialize run distributions and statistics
self._update_distributions_and_statistics(distributions, statistics)
# Initialize run statistics
self._update_statistics(statistics)

@abc.abstractmethod
def execute(self, p_id, phases, distributions, statistics, a_min_max):
def execute(self, p_id, phases, statistics, a_min_max):
"""Execute balancing algorithm on Phase instance.

:param: p_id: index of phase to be rebalanced (all if equal to _)
:param: phases: list of Phase instances
:param: distributions: dictionary of load-varying variables
:param: statistics: dictionary of statistics
:param: a_min_max: possibly empty list of optimal arrangements.
"""
14 changes: 6 additions & 8 deletions src/lbaf/Execution/lbsBruteForceAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,24 @@
class BruteForceAlgorithm(AlgorithmBase):
"""A concrete class for the brute force optimization algorithm"""

def __init__(self, work_model, parameters: dict, lgr: Logger, rank_qoi: str, object_qoi: str):
def __init__(self, work_model, parameters: dict, lgr: Logger):
"""Class constructor.

:param work_model: a WorkModelBase instance
:param parameters: a dictionary of parameters
:param rank_qoi: rank QOI to track
:param object_qoi: object QOI to track.
"""
# Call superclass init
super().__init__(work_model, parameters, lgr, rank_qoi, object_qoi)
super().__init__(work_model, parameters, lgr)

# Assign optional parameters
self.__skip_transfer = parameters.get("skip_transfer", False)
self._logger.info(
f"Instantiated {'with' if self.__skip_transfer else 'without'} transfer stage skipping")

def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, _):
def execute(self, p_id: int, phases: list, statistics: dict, _):
"""Execute brute force optimization algorithm on phase with index p_id."""
# Perform pre-execution checks and initializations
self._initialize(p_id, phases, distributions, statistics)
self._initialize(p_id, phases, statistics)
self._logger.info("Starting brute force optimization")
initial_phase = phases[min(phases.keys())]
phase_ranks = initial_phase.get_ranks()
Expand Down Expand Up @@ -113,8 +111,8 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
# Report on object transfers
self._logger.info(f"{n_transfers} transfers occurred")

# Update run distributions and statistics
self._update_distributions_and_statistics(distributions, statistics)
# Update run statistics
self._update_statistics(statistics)

# Report final mapping in debug mode
self._report_final_mapping(self._logger)
Loading
Loading