Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/challenging-toy-fewer-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ work_model:
parameters:
beta: 0.0
gamma: 0.0
delta: 0.0
upper_bounds:
max_memory_usage: 8.0e+9

Expand Down
1 change: 1 addition & 0 deletions config/challenging-toy-hundreds-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ work_model:
parameters:
beta: 0.0
gamma: 0.0
delta: 0.0
upper_bounds:
max_memory_usage: 8000000000.0

Expand Down
1 change: 1 addition & 0 deletions config/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ work_model:
parameters:
beta: 0.0
gamma: 0.0
delta: 0.0

# Specify algorithm
algorithm:
Expand Down
3 changes: 2 additions & 1 deletion config/synthetic-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ work_model:
parameters:
beta: 0.0
gamma: 0.0
delta: 0.1
upper_bounds:
max_memory_usage: 45.0

Expand Down Expand Up @@ -44,7 +45,7 @@ visualization:
y_ranks: 2
z_ranks: 1
object_jitter: 0.5
rank_qoi: load
rank_qoi: homing
object_qoi: shared_id
save_meshes: true
force_continuous_object_qoi: true
Expand Down
1 change: 1 addition & 0 deletions docs/pages/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Example configuration
parameters:
beta: 0.
gamma: 0.
delta: 0.

# Specify balancing algorithm
algorithm:
Expand Down
1 change: 1 addition & 0 deletions docs/pages/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Synthetic Blocks Test Configuration
parameters:
beta: 0.
gamma: 0.
delta: 0.

# Specify balancing algorithm
algorithm:
Expand Down
5 changes: 5 additions & 0 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ def __print_statistics(self, phase: Phase, phase_name: str, work_model: WorkMode
f"{phase_name} node maximum memory usage",
self.__logger)
if r_shared_mem_stats.get_maximum():
lbstats.print_function_statistics(
phase.get_ranks(),
lambda x: x.get_homing(),
f"{phase_name} homing cost",
self.__logger)
lbstats.print_function_statistics(
phase.get_ranks(),
lambda x: x.get_homed_blocks_ratio(),
Expand Down
2 changes: 2 additions & 0 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def __init__(self, work_model: WorkModelBase, parameters: dict, logger: Logger):
self.__statistics = {
("ranks", lambda x: x.get_load()): {
"maximum load": "maximum"},
("ranks", lambda x: self._work_model.compute(x)): {
"maximum work": "maximum"},
("ranks", lambda x: self._work_model.compute(x)): {
"total work": "sum"}}

Expand Down
7 changes: 4 additions & 3 deletions src/lbaf/Execution/lbsBruteForceAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ def execute(self, p_id: int, phases: list, statistics: dict):
n_ranks = len(phase_ranks)
affine_combination = isinstance(
self._work_model, AffineCombinationWorkModel)
alpha, beta, gamma = [
alpha, beta, gamma, delta = [
self._work_model.get_alpha() if affine_combination else 1.0,
self._work_model.get_beta() if affine_combination else 0.0,
self._work_model.get_gamma() if affine_combination else 0.0]
self._work_model.get_gamma() if affine_combination else 0.0,
self._work_model.get_delta() if affine_combination else 0.0]
_n_a, _w_min_max, a_min_max = compute_min_max_arrangements_work(
objects, alpha, beta, gamma, n_ranks,
objects, alpha, beta, gamma, delta, n_ranks,
logger=self._logger)

# Skip object transfers when requested
Expand Down
6 changes: 2 additions & 4 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,10 @@ def execute(self, p_id: int, phases: list, statistics: dict):
# Set phase to be used by transfer criterion
self.__transfer_criterion.set_phase(self._rebalanced_phase)

# Retrieve total work from computed statistics
total_work = statistics["total work"][-1]

# Perform requested number of load-balancing iterations
s_name = "maximum work"
for i in range(self.__n_iterations):
self._logger.info(f"Starting iteration {i + 1} with total work of {total_work}")
self._logger.info(f"Starting iteration {i + 1} with {s_name} of {statistics[s_name][-1]:.6g}")

# Time the duration of each iteration
start_time = time.time()
Expand Down
2 changes: 1 addition & 1 deletion src/lbaf/Execution/lbsTransferStrategyBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def _initialize_transfer_stage(self, ave_load: float):
"""Initialize transfer stage consistently across strategies."""

# Keep track of average load
self._logger.info(f"Executing transfer phase with average load: {ave_load}")
self._average_load = ave_load
self._logger.info(f"Executing transfer phase with average load: {self._average_load}")

# Initialize numbers of transfers and rejects
self._n_transfers = 0
Expand Down
1 change: 1 addition & 0 deletions src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(self, config_to_validate: dict, logger: Logger):
"parameters": {
"beta": float,
"gamma": float,
Optional("delta"): float,
Optional("upper_bounds"): And(
dict,
lambda x: all(isinstance(y, float) for y in x.values()))}},
Expand Down
58 changes: 40 additions & 18 deletions src/lbaf/IO/lbsStatistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ def compute_load(objects: tuple, rank_object_ids: list) -> float:
return sum(objects[i].get_load() for i in rank_object_ids)


def compute_arrangement_works(objects: tuple, arrangement: tuple, alpha: float, beta: float, gamma: float) -> dict:
def compute_arrangement_works(
objects: tuple, arrangement: tuple,
alpha: float, beta: float, gamma: float, delta: float,
logger: Optional[Logger] = None) -> dict:
"""Return a dictionary with works of rank objects."""
# Build object rank map from arrangement
ranks = {}
Expand All @@ -229,8 +232,14 @@ def compute_arrangement_works(objects: tuple, arrangement: tuple, alpha: float,

# Do not calculate communications if beta = 0 for better performance
if beta > 0.0:
works[rank] += beta * max(compute_volume(objects, rank_objects, "received"),
compute_volume(objects, rank_objects, "sent"))
works[rank] += beta * max(
compute_volume(objects, rank_objects, "received"),
compute_volume(objects, rank_objects, "sent"))

# Homing cost not calculated yet
if delta > 0.0 and logger is not None:
logger.error("Delta homing cost not calculated yet")
raise SystemExit(1)

# Add constant
works[rank] += gamma
Expand All @@ -239,16 +248,20 @@ def compute_arrangement_works(objects: tuple, arrangement: tuple, alpha: float,
return works


def compute_min_max_arrangements_work(objects: tuple, alpha: float, beta: float, gamma: float, n_ranks: int,
sanity_checks=True, logger: Optional[Logger] = None):
def compute_min_max_arrangements_work(
objects: tuple,
alpha: float, beta: float, gamma: float, delta: float,
n_ranks: int,
sanity_checks=True, logger: Optional[Logger] = None):
"""Compute all possible arrangements with repetition and minimax work."""
# Initialize quantities of interest
n_arrangements = 0
works_min_max = math.inf
arrangements_min_max = []
for arrangement in itertools.product(range(n_ranks), repeat=len(objects)):
# Compute per-rank works for current arrangement
works = compute_arrangement_works(objects, arrangement, alpha, beta, gamma)
works = compute_arrangement_works(
objects, arrangement, alpha, beta, gamma, delta)

# Update minmax when relevant
work_max = max(works.values())
Expand Down Expand Up @@ -281,7 +294,8 @@ def compute_min_max_arrangements_work(objects: tuple, alpha: float, beta: float,


def compute_pairwise_reachable_arrangements(
objects: tuple, arrangement: tuple, alpha: float, beta: float, gamma: float,
objects: tuple, arrangement: tuple,
alpha: float, beta: float, gamma: float, delta: float,
w_max: float, from_id: int, to_id: int, n_ranks: int,
max_objects: Optional[int] = None, logger: Optional[Logger] = None):
"""Compute arrangements reachable by moving up to a given maximum number of objects."""
Expand Down Expand Up @@ -312,8 +326,10 @@ def compute_pairwise_reachable_arrangements(
for c in itertools.combinations(matches, n):
# Change all corresponding entries
n_possible += 1
new_arrangement = tuple(to_id if i in c else r for i, r in enumerate(arrangement))
works = compute_arrangement_works(objects, new_arrangement, alpha, beta, gamma)
new_arrangement = tuple(
to_id if i in c else r for i, r in enumerate(arrangement))
works = compute_arrangement_works(
objects, new_arrangement, alpha, beta, gamma, delta)

# Check whether new arrangements is reachable
w_max_new = max(works.values())
Expand Down Expand Up @@ -441,8 +457,10 @@ def print_subset_statistics(subset_name, subset_size, set_name, set_size, logger
ss = f"{100. * subset_size / set_size:.3g}" if set_size else ''
logger.info(f"{subset_name}: {subset_size:.6g} amongst {set_size:.6g} {set_name} ({ss}%)")

def compute_all_reachable_arrangements(objects: tuple, arrangement: tuple, alpha: float, beta: float, gamma: float,
w_max: float, n_ranks: int, logger: Logger, max_objects: int = None):
def compute_all_reachable_arrangements(
objects: tuple, arrangement: tuple,
alpha: float, beta: float, gamma: float, delta: float,
w_max: float, n_ranks: int, logger: Logger, max_objects: int = None):
"""Compute all arrangements reachable by moving up to a maximum number of objects."""

logger.warning("Old function. Used by the removed Rank Object Enumerator script")
Expand All @@ -456,8 +474,10 @@ def compute_all_reachable_arrangements(objects: tuple, arrangement: tuple, alpha
for to_id in range(n_ranks):
if from_id == to_id:
continue
reachable.update(compute_pairwise_reachable_arrangements(objects, arrangement, alpha, beta, gamma, w_max,
from_id, to_id, n_ranks, max_objects, logger))
reachable.update(compute_pairwise_reachable_arrangements(
objects, arrangement,
alpha, beta, gamma, delta,
w_max,from_id, to_id, n_ranks, max_objects, logger))
logger.info(
f"Found {len(reachable)} reachable arrangements, with maximum work: {max(reachable.values())}:")
for k, v in reachable.items():
Expand All @@ -467,9 +487,11 @@ def compute_all_reachable_arrangements(objects: tuple, arrangement: tuple, alpha
return reachable


def recursively_compute_transitions(stack: list, visited: dict, objects: tuple, arrangement: tuple, alpha: float,
beta: float, gamma: float, w_max: float, w_min_max: float, n_ranks: int,
logger: Logger, max_objects: Optional[int] = None):
def recursively_compute_transitions(
stack: list, visited: dict, objects: tuple, arrangement: tuple,
alpha: float, beta: float, gamma: float, delta: float,
w_max: float, w_min_max: float, n_ranks: int,
logger: Logger, max_objects: Optional[int] = None):
"""Recursively compute all possible transitions to reachable arrangements from initial one."""

logger.warning("Old function. Used by the removed Rank Object Enumerator script")
Expand All @@ -494,7 +516,7 @@ def recursively_compute_transitions(stack: list, visited: dict, objects: tuple,
reachable = compute_all_reachable_arrangements(
objects,
arrangement,
alpha, beta, gamma,
alpha, beta, gamma, delta,
w_max,
n_ranks,
max_objects)
Expand All @@ -512,7 +534,7 @@ def recursively_compute_transitions(stack: list, visited: dict, objects: tuple,
visited,
objects,
k,
alpha, beta, gamma,
alpha, beta, gamma, delta,
w_max, w_min_max,
n_ranks,
max_objects)
Expand Down
20 changes: 13 additions & 7 deletions src/lbaf/Model/lbsAffineCombinationWorkModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,23 @@ class AffineCombinationWorkModel(WorkModelBase):
def __init__(self, parameters, lgr: Logger):
"""Class constructor:

parameters: dictionary with alpha, beta, and gamma values.
parameters: dictionary with alpha, beta, gamma and delta values.
"""
# Assign logger to instance variable
self.__logger = lgr

# Use default values if parameters not provided
self.__beta = parameters.get("beta", 0.0)
self.__gamma = parameters.get("gamma", 0.0)
self.__delta = parameters.get("delta", 0.0)
self.__upper_bounds = parameters.get("upper_bounds", {})
self.__node_bounds = parameters.get("node_bounds", False)

# Call superclass init
super().__init__(parameters)
self.__logger.info(
"Instantiated work model with: "
f"beta={self.__beta}, gamma={self.__gamma}")
f"beta={self.__beta}, gamma={self.__gamma}, delta={self.__delta}")
for k, v in self.__upper_bounds.items():
self.__logger.info(
f"Upper bound for {'node' if self.__node_bounds else 'rank'} {k}: {v}")
Expand All @@ -82,14 +83,18 @@ def get_gamma(self):
"""Get the gamma parameter."""
return self.__gamma

def affine_combination(self, a, l, v1, v2):
"""Compute affine combination of load and maximum volume."""
return a * l + self.__beta * max(v1, v2) + self.__gamma
def get_delta(self):
"""Get the delta parameter."""
return self.__delta

def affine_combination(self, a, l, v1, v2, h):
"""Compute affine combination of load, maximum volume, and homing cost."""
return a * l + self.__beta * max(v1, v2) + self.__gamma + self.__delta * h

def compute(self, rank: Rank):
"""A work model with affine combination of load and communication.

alpha * load + beta * max(sent, received) + gamma,
alpha * load + beta * max(sent, received) + gamma + delta * homing,
under optional strict upper bounds.
"""
# Check whether strict bounds are satisfied
Expand All @@ -104,4 +109,5 @@ def compute(self, rank: Rank):
rank.get_alpha(),
rank.get_load(),
rank.get_received_volume(),
rank.get_sent_volume())
rank.get_sent_volume(),
rank.get_homing())
3 changes: 2 additions & 1 deletion src/lbaf/Model/lbsPhase.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ def populate_from_samplers(self, n_ranks, n_objects, t_sampler, v_sampler, c_deg
raise SystemExit(1)

# Compute and report communication volume statistics
print_function_statistics(v_sent, lambda x: x, "communication volumes", self.__logger)
print_function_statistics(
v_sent, lambda x: x, "communication volumes", self.__logger)

# Create given number of ranks
self.__ranks = [Rank(self.__logger, r_id) for r_id in range(n_ranks)]
Expand Down
23 changes: 16 additions & 7 deletions src/lbaf/Model/lbsRank.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ def get_number_of_homed_blocks(self) -> float:
b.get_home_id() == self.get_id()
for b in self.get_shared_blocks())

@qoi
def get_homing(self) -> float:
"""Return homing cost on rank."""
val : float = 0.0
return val + sum(
b.get_size() for b in self.get_shared_blocks()
if self.get_id() != b.get_home_id())

@qoi
def get_number_of_uprooted_blocks(self) -> float:
"""Return number of uprooted memory blocks on rank."""
Expand Down Expand Up @@ -285,22 +293,21 @@ def get_alpha(self) -> float:
def get_load(self) -> float:
"""Return total load on rank."""
val : float = 0.0
val += sum(o.get_load() for o in self.__migratable_objects.union(self.__sentinel_objects))
return val
return val + sum(
o.get_load()
for o in self.__migratable_objects.union(self.__sentinel_objects))

@qoi
def get_migratable_load(self) -> float:
"""Return migratable load on rank."""
val : float = 0.0
val += sum(o.get_load() for o in self.__migratable_objects)
return val
return val + sum(o.get_load() for o in self.__migratable_objects)

@qoi
def get_sentinel_load(self) -> float:
"""Return sentinel load on rank."""
val : float = 0.0
val += sum(o.get_load() for o in self.__sentinel_objects)
return val
return val + sum(o.get_load() for o in self.__sentinel_objects)

@qoi
def get_received_volume(self) -> float:
Expand All @@ -314,7 +321,9 @@ def get_received_volume(self) -> float:
continue

# Add total volume received from non-local objects
volume += sum(v for k, v in o.get_communicator().get_received().items() if k not in obj_set)
volume += sum(
v for k, v in o.get_communicator().get_received().items()
if k not in obj_set)

# Return computed volume
return volume
Expand Down