Skip to content
Draft
6 changes: 3 additions & 3 deletions config/synthetic-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ check_schema: true
work_model:
name: AffineCombination
parameters:
beta: 0.0
beta: 1000.0
gamma: 0.0
delta: 0.1
delta: 0.0
upper_bounds:
max_memory_usage: 45.0

Expand All @@ -27,7 +27,7 @@ algorithm:
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: 4
criterion: Tempered
criterion: TemperedWithUpdates
max_objects_per_transfer: 8
deterministic_transfer: true

Expand Down
1 change: 1 addition & 0 deletions src/lbaf/Execution/lbsCriterionBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def factory(criterion_name: str, work_model: WorkModelBase, logger: Logger):
# Load up available criteria
# pylint:disable=W0641:possibly-unused-variable,C0415:import-outside-toplevel
from .lbsTemperedCriterion import TemperedCriterion
from .lbsTemperedWithUpdatesCriterion import TemperedWithUpdatesCriterion
from .lbsStrictLocalizingCriterion import StrictLocalizingCriterion
# pylint:enable=W0641:possibly-unused-variable,C0415:import-outside-toplevel

Expand Down
93 changes: 93 additions & 0 deletions src/lbaf/Execution/lbsTemperedWithUpdatesCriterion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
#@HEADER
###############################################################################
#
# lbsTemperedWithUpdatesCriterion.py
# DARMA/LB-analysis-framework => LB Analysis Framework
#
# Copyright 2019-2024 National Technology & Engineering Solutions of Sandia, LLC
# (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
# Government retains certain rights in this software.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Questions? Contact [email protected]
#
###############################################################################
#@HEADER
#
from logging import Logger
from typing import Optional

from .lbsCriterionBase import CriterionBase
from ..Model.lbsRank import Rank


class TemperedWithUpdatesCriterion(CriterionBase):
"""A concrete class for the Grapevine criterion with update formulae."""

def __init__(self, work_model, lgr: Logger):
"""Class constructor."""
# Call superclass init
super().__init__(work_model, lgr)
self._logger.info(f"Instantiated {type(self).__name__} concrete criterion")

def compute(self, r_src: Rank, o_src: list, r_dst: Rank, o_dst: Optional[list]=None) -> float:
"""Tempered work criterion based on L1 norm of works using update formulae."""
if o_dst is None:
o_dst = []

# Compute maximum work of original arrangement
w_max_0 = max(
self._work_model.compute(r_src),
self._work_model.compute(r_dst))

# Compute update formulae
w_max_up = max(
w1 := self._work_model.update(r_src, o_src, o_dst),
w2 := self._work_model.update(r_dst, o_dst, o_src))

# Move objects into proposed new arrangement
self._phase.transfer_objects(r_src, o_src, r_dst, o_dst)

# Compute maximum work of proposed new arrangement
w_max_new = max(
w3 := self._work_model.compute(r_src),
w4 := self._work_model.compute(r_dst))

# Move objects back into original arrangement
self._phase.transfer_objects(r_dst, o_src, r_src, o_dst)

# Sanity check
if w_max_new != w_max_up:
self._logger.error(f"Discrepancy in post update works: {w_max_new} <> {w_max_up}")
print(w1, w3)
print(w2, w4)
raise SystemExit(1)

# Return criterion value
return w_max_0 - w_max_new
2 changes: 1 addition & 1 deletion src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"CentralizedPrefixOptimizer",
"PrescribedPermutation",
"PhaseStepper")
ALLOWED_CRITERIA = ("Tempered", "StrictLocalizing")
ALLOWED_CRITERIA = ("Tempered", "TemperedWithUpdates", "StrictLocalizing")
ALLOWED_LOGGING_LEVELS = ("info", "debug", "warning", "error")
ALLOWED_LOAD_VOLUME_SAMPLER = ("uniform", "lognormal")

Expand Down
171 changes: 170 additions & 1 deletion src/lbaf/Model/lbsAffineCombinationWorkModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def affine_combination(self, a, l, v1, v2, h):

def compute(self, rank: Rank):
"""A work model with affine combination of load and communication.

alpha * load + beta * max(sent, received) + gamma + delta * homing,
under optional strict upper bounds.
"""
Expand All @@ -111,3 +110,173 @@ def compute(self, rank: Rank):
rank.get_received_volume(),
rank.get_sent_volume(),
rank.get_homing())

def __update_load(self, rank: Rank, o_snd: list, o_rcv: list):
"""Update total load if objects are to be sent and received."""
return rank.get_load() - sum(
o.get_load() for o in o_snd) + sum(
o.get_load() for o in o_rcv)

def __update_received(self, rank: Rank, o_snd: list, o_rcv: list):
"""Update received volume if objects are to be sent and received."""
# Keep track of rank id and objects
r_id = rank.get_id()
r_obj = rank.get_objects().copy()

# Retrieve current received volume
volume = rank.get_received_volume()

# Iterate over all sent objects
for o in o_snd:
# Skip non-communicating objects
if not (c := o.get_communicator()):
continue

# Subtract communications received by object from other ranks
for k, v in c.get_received().items():
if k not in r_obj:
volume -= v

# Add communications sent from object to current rank
for k, v in c.get_sent().items():
if k in r_obj:
volume += v

# Remove object from rank
r_obj.discard(o)

# Iterate over all received objects
for o in o_rcv:
# Skip non-communicating objects
if not (c := o.get_communicator()):
continue

# Add communications received by object from other ranks
for k, v in c.get_received().items():
if k not in r_obj:
volume += v

# Subtract communications sent from object to current rank
for k, v in c.get_sent().items():
if k in r_obj:
volume -= v

# Add object to rank
r_obj.add(o)

# Return updated received volume
return volume

def __update_sent(self, rank: Rank, o_snd: list, o_rcv: list):
"""Update sent volume if objects are to be sent and received."""
# Keep track of rank id and objects
r_id = rank.get_id()
r_obj = rank.get_objects().copy()

# Retrieve current sent volume
volume = rank.get_sent_volume()

# Iterate over all sent objects
for o in o_snd:
# Skip non-communicating objects
if not (c := o.get_communicator()):
continue

# Subtract communications sent from object to other ranks
for k, v in c.get_sent().items():
if k not in r_obj:
volume -= v

# Add communications received by object from current rank
for k, v in c.get_received().items():
if k in r_obj:
volume += v

# Remove object from rank
r_obj.discard(o)

# Iterate over all received objects
for o in o_rcv:
# Skip non-communicating objects
if not (c := o.get_communicator()):
continue

# Add communications sent from object to other ranks
for k, v in c.get_sent().items():
if k not in r_obj:
volume += v

# Subtract communications received by object from current rank
for k, v in c.get_received().items():
if k in r_obj:
volume -= v

# Add object to rank
r_obj.add(o)

# Return updated sent volume
return volume

def __update_homing(self, rank: Rank, o_snd: list, o_rcv: list):
"""Update homing costs if objects are to be sent and received."""
# Keep track of rank id and objects
r_id = rank.get_id()
r_obj = rank.get_objects().copy()

# Retrieve current homing cost
homing = rank.get_homing()

# Iterate over all sent objects
for o in o_snd:
# Remove object from rank
r_obj.discard(o)

# Skip locally homed blocks
b = o.get_shared_block()
if b.get_home_id() == r_id:
continue

# Determine set of removed non-homed blocks
S = set({b})
for o_oth in r_obj:
if o_oth.get_shared_block() == b:
S = set()
break

# Update homing cost
for b in S:
homing -= b.get_size()

# Iterate over all received objects
for o in o_rcv:
# Skip locally homed blocks
b = o.get_shared_block()
if b.get_home_id() == r_id:
continue

# Determine set of added non-homed blocks
S = set({b})
for o_oth in r_obj:
if o_oth.get_shared_block() == b:
S = set()
break

# Update homing cost
for b in S:
homing += b.get_size()

# Addd object to rank
r_obj.add(o)

# Return updated homing cost
return homing

def update(self, rank: Rank, o_snd: list, o_rcv: list):
"""Update work if objects are to be sent and received."""
# Return combination of load and volumes
return self.affine_combination(
rank.get_alpha(),
self.__update_load(rank, o_snd, o_rcv),
self.__update_received(rank, o_snd, o_rcv),
self.__update_sent(rank, o_snd, o_rcv),
self.__update_homing(rank, o_snd, o_rcv))
9 changes: 7 additions & 2 deletions src/lbaf/Model/lbsLoadOnlyWorkModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def __init__(self, _, lgr: Logger):
self.__logger.info("Instantiated concrete work model")

def compute(self, rank: Rank):
"""A work model summing all object loads on given rank."""
# Return total load on this rank
"""This work model only considers total object load."""
return rank.get_load()

def update(self, rank: Rank, o_snd: list, o_rcv: list):
"""Update total load if objects are to be sent and received."""
return rank.get_load() + sum(
o.get_load() for o in o_rcv) - sum(
o.get_load() for o in o_snd)
7 changes: 6 additions & 1 deletion src/lbaf/Model/lbsWorkModelBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,10 @@ def factory(work_name, parameters, lgr: Logger):

@abc.abstractmethod
def compute(self, rank):
"""Return value of work for given rank."""
"""Return value of work on given rank."""
# Must be implemented by concrete subclass

@abc.abstractmethod
def update(self, rank, o_snd, o_rcv):
"""Compute updated work on given rank if sending and receiving objects."""
# Must be implemented by concrete subclass
Loading