Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/challenging-toy-fewer-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ work_model:
parameters:
beta: 0.0
gamma: 0.0
delta: 0.0
upper_bounds:
max_memory_usage: 8.0e+9

Expand Down
7 changes: 4 additions & 3 deletions config/challenging-toy-hundreds-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ work_model:
parameters:
beta: 0.0
gamma: 0.0
delta: 0.0
upper_bounds:
max_memory_usage: 8000000000.0

Expand All @@ -20,12 +21,12 @@ algorithm:
name: InformAndTransfer
phase_id: 0
parameters:
n_iterations: 8
n_rounds: 4
n_iterations: 4
n_rounds: 3
fanout: 4
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: 10
max_subclusters: 0
cluster_swap_rtol: 0.05
criterion: Tempered
max_objects_per_transfer: 500
Expand Down
1 change: 1 addition & 0 deletions config/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ work_model:
parameters:
beta: 0.0
gamma: 0.0
delta: 0.0

# Specify algorithm
algorithm:
Expand Down
3 changes: 2 additions & 1 deletion config/synthetic-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ work_model:
parameters:
beta: 0.0
gamma: 0.0
delta: 0.1
upper_bounds:
max_memory_usage: 45.0

Expand Down Expand Up @@ -44,7 +45,7 @@ visualization:
y_ranks: 2
z_ranks: 1
object_jitter: 0.5
rank_qoi: load
rank_qoi: homing
object_qoi: shared_id
save_meshes: true
force_continuous_object_qoi: true
Expand Down
1 change: 1 addition & 0 deletions docs/pages/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Example configuration
parameters:
beta: 0.
gamma: 0.
delta: 0.

# Specify balancing algorithm
algorithm:
Expand Down
1 change: 1 addition & 0 deletions docs/pages/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Synthetic Blocks Test Configuration
parameters:
beta: 0.
gamma: 0.
delta: 0.

# Specify balancing algorithm
algorithm:
Expand Down
5 changes: 5 additions & 0 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@
f"{phase_name} node maximum memory usage",
self.__logger)
if r_shared_mem_stats.get_maximum():
lbstats.print_function_statistics(
phase.get_ranks(),
lambda x: x.get_homing(),
f"{phase_name} homing cost",
self.__logger)
lbstats.print_function_statistics(
phase.get_ranks(),
lambda x: x.get_homed_blocks_ratio(),
Expand Down Expand Up @@ -435,15 +440,15 @@
if verbosity == 1:
self.__logger.info("\tRank QOI:")
for name, _ in rank_qois.items():
self.__logger.info("\t\t" + name)

Check warning on line 443 in src/lbaf/Applications/LBAF_app.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Use lazy % formatting in logging functions (logging-not-lazy)
elif verbosity > 1:
self.__logger.info("\tRank QOI:")
for name, _ in rank_qois.items():
self.__logger.info("\t\t" + name)

Check warning on line 447 in src/lbaf/Applications/LBAF_app.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Use lazy % formatting in logging functions (logging-not-lazy)
self.__logger.info("")
self.__logger.info("\tObject QOI:")
for name, _ in object_qois.items():
self.__logger.info("\t\t" + name)

Check warning on line 451 in src/lbaf/Applications/LBAF_app.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Use lazy % formatting in logging functions (logging-not-lazy)

def run(self, cfg=None, cfg_dir=None):
"""Run the LBAF application."""
Expand Down
2 changes: 2 additions & 0 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from typing import List

from ..IO.lbsStatistics import compute_function_statistics
from ..Model.lbsNode import Node

Check warning on line 47 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused Node imported from Model.lbsNode (unused-import)
from ..Model.lbsPhase import Phase
from ..Model.lbsWorkModelBase import WorkModelBase
from ..Utils.lbsLogging import Logger
Expand Down Expand Up @@ -99,7 +99,9 @@
self.__statistics = {
("ranks", lambda x: x.get_load()): {
"maximum load": "maximum"},
("ranks", lambda x: self._work_model.compute(x)): {

Check warning on line 102 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Lambda may not be necessary (unnecessary-lambda)
"maximum work": "maximum"},
("ranks", lambda x: self._work_model.compute(x)): {

Check warning on line 104 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Lambda may not be necessary (unnecessary-lambda)
"total work": "sum"}}

def get_initial_communications(self):
Expand Down
7 changes: 4 additions & 3 deletions src/lbaf/Execution/lbsBruteForceAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ def execute(self, p_id: int, phases: list, statistics: dict):
n_ranks = len(phase_ranks)
affine_combination = isinstance(
self._work_model, AffineCombinationWorkModel)
alpha, beta, gamma = [
alpha, beta, gamma, delta = [
self._work_model.get_alpha() if affine_combination else 1.0,
self._work_model.get_beta() if affine_combination else 0.0,
self._work_model.get_gamma() if affine_combination else 0.0]
self._work_model.get_gamma() if affine_combination else 0.0,
self._work_model.get_delta() if affine_combination else 0.0]
_n_a, _w_min_max, a_min_max = compute_min_max_arrangements_work(
objects, alpha, beta, gamma, n_ranks,
objects, alpha, beta, gamma, delta, n_ranks,
logger=self._logger)

# Skip object transfers when requested
Expand Down
47 changes: 25 additions & 22 deletions src/lbaf/Execution/lbsClusteringTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from ..Model.lbsPhase import Phase


class ClusteringTransferStrategy(TransferStrategyBase):

Check notice on line 56 in src/lbaf/Execution/lbsClusteringTransferStrategy.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many instance attributes (12/7) (too-many-instance-attributes)
"""A concrete class for the clustering-based transfer strategy."""

def __init__(self, criterion, parameters: dict, lgr: Logger):
Expand All @@ -76,10 +76,10 @@
self._logger.info(
f"Percentage of maximum load required for subclustering: {self.__subclustering_threshold}")

# Initialize fraction of local imbalance that must be resolved by subcluster
# Initialize fraction of local imbalance to be resolved by subcluster
self.__subclustering_minimum_improvement = parameters.get("subclustering_minimum_improvement", 0.0)
self._logger.info(
"Fraction of local imbalance that must be resolved by subcluster: "
"Local imbalance fraction to be resolved by subcluster: "
f"{self.__subclustering_minimum_improvement}")

# Initialize cluster swap relative threshold
Expand Down Expand Up @@ -201,7 +201,7 @@
self._n_rejects += len(o_src) + len(o_try)

# Return number of swaps performed from rank
n_rank_swaps = 0
return n_rank_swaps

def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float) -> None:
"""Perform feasible subcluster transfers from given rank to possible targets."""
Expand Down Expand Up @@ -250,23 +250,20 @@
# Reject subcluster transfer
self._n_rejects += len(o_src)

def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float):

Check notice on line 253 in src/lbaf/Execution/lbsClusteringTransferStrategy.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many branches (14/12) (too-many-branches)
"""Perform object transfer stage."""
# Initialize transfer stage
self._initialize_transfer_stage(ave_load)
rank_targets = self._get_ranks_to_traverse(phase.get_ranks(), known_peers)

# Iterate over ranks
n_ranks = len(phase.get_ranks())
for r_src, targets in rank_targets.items():
# Cluster migratable objects on source rank
clusters_src = self.__build_rank_clusters(r_src, True)
self._logger.debug(
f"Constructed {len(clusters_src)} migratable clusters on source rank {r_src.get_id()}")

# Skip subclustering for this rank when it must be done later
if self.__separate_subclustering:
continue

# Perform feasible cluster swaps from given rank to possible targets
if (n_rank_swaps := self.__swap_clusters(phase, r_src, clusters_src, targets)):
# Report on swaps when some occurred
Expand All @@ -280,30 +277,36 @@
continue

# Perform feasible subcluster swaps from given rank to possible targets
if self.__max_subclusters > 0:
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)
if not self.__separate_subclustering:
if self.__max_subclusters > 0:
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)
else:
self.__n_sub_skipped += 1

# Report on new load and exit from rank
self._logger.debug(
f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers")

# Perform subclustering when it was not previously done
if self.__max_subclusters > 0 and self.__separate_subclustering:
# In non-deterministic case skip subclustering when swaps passed
if self.__n_swaps and not self._deterministic_transfer:
self.__n_sub_skipped += len(rank_targets)
if self.__separate_subclustering:
if self.__max_subclusters > 0:
# In non-deterministic case skip subclustering when swaps passed
if self.__n_swaps and not self._deterministic_transfer:
self.__n_sub_skipped = n_ranks
else:
# Iterate over ranks
for r_src, targets in rank_targets.items():
# Perform feasible subcluster swaps from given rank to possible targets
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)

# Report on new load and exit from rank
self._logger.debug(
f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers")

Check notice on line 304 in src/lbaf/Execution/lbsClusteringTransferStrategy.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Line too long (121/120) (line-too-long)
else:
# Iterate over ranks
for r_src, targets in rank_targets.items():
# Perform feasible subcluster swaps from given rank to possible targets
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)

# Report on new load and exit from rank
self._logger.debug(
f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers")
# Subclustering is skipped altogether for all ranks
self.__n_sub_skipped = n_ranks

# Report on global transfer statistics
n_ranks = len(phase.get_ranks())
self._logger.info(
f"Swapped {self.__n_swaps} cluster pairs amongst {self.__n_swap_tries} tries "
f"({100 * self.__n_swaps / self.__n_swap_tries:.2f}%)")
Expand Down
46 changes: 26 additions & 20 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from ..Model.lbsRank import Rank
from ..Model.lbsMessage import Message
from ..Model.lbsPhase import Phase
from ..IO.lbsStatistics import min_Hamming_distance, print_function_statistics
from ..IO.lbsStatistics import compute_function_statistics


class InformAndTransferAlgorithm(AlgorithmBase):
Expand Down Expand Up @@ -210,30 +210,33 @@ def __execute_information_stage(self):
self._logger.debug(
f"Peers known to rank {rank.get_id()}: {[r_k.get_id() for r_k in k_p]}")

# Report on final known information ratio
n_k = sum(len(k_p) for k_p in self.__known_peers.values() if k_p) / n_r
# Compute and report on final known information ratio
if n_r > 1:
# Knowledge ratios can be computed
sum_kappa = 0.0
known_fac = 1.0 / (n_r - 1.0)
for rank, peers in self.__known_peers.items():
kappa = known_fac * (len(peers) - 1)
rank.set_kappa(kappa)
sum_kappa += kappa
else:
self._logger.warning(
f"Cannot compute knowledge ratio with only {n_r} ranks")
self._logger.info(
f"Average number of peers known to ranks: {n_k} ({100 * n_k / n_r:.2f}% of {n_r})")
f"Average rank knowledge ratio: {sum_kappa / n_r:.4g}")

def execute(self, p_id: int, phases: list, statistics: dict):
""" Execute 2-phase information+transfer algorithm on Phase with index p_id."""
# Perform pre-execution checks and initializations
self._initialize(p_id, phases, statistics)
print_function_statistics(
self._rebalanced_phase.get_ranks(),
self._work_model.compute,
"initial rank work",
self._logger)

# Set phase to be used by transfer criterion
self.__transfer_criterion.set_phase(self._rebalanced_phase)

# Retrieve total work from computed statistics
total_work = statistics["total work"][-1]

# Perform requested number of load-balancing iterations
s_name = "maximum work"
for i in range(self.__n_iterations):
self._logger.info(f"Starting iteration {i + 1} with total work of {total_work}")
self._logger.info(f"Starting iteration {i + 1} with {s_name} of {statistics[s_name][-1]:.6g}")

# Time the duration of each iteration
start_time = time.time()
Expand All @@ -256,12 +259,15 @@ def execute(self, p_id: int, phases: list, statistics: dict):
self._logger.info(
f"Iteration {i + 1} completed ({n_ignored} skipped ranks) in {time.time() - start_time:.3f} seconds")

# Compute and report iteration work statistics
stats = print_function_statistics(
# Compute and report iteration load imbalance and maximum work
load_imb = compute_function_statistics(
self._rebalanced_phase.get_ranks(),
lambda x: x.get_load()).get_imbalance()
self._logger.info(f"\trank load imbalance: {load_imb:.6g}")
max_work = compute_function_statistics(
self._rebalanced_phase.get_ranks(),
self._work_model.compute,
f"iteration {i + 1} rank work",
self._logger)
self._work_model.compute).get_maximum()
self._logger.info(f"\tmaximum rank work: {max_work:.6g}")

# Update run statistics
self._update_statistics(statistics)
Expand All @@ -273,9 +279,9 @@ def execute(self, p_id: int, phases: list, statistics: dict):
self._initial_phase.get_lb_iterations().append(lb_iteration)

# Check if the current imbalance is within the target_imbalance range
if stats.statistics["imbalance"] <= self.__target_imbalance:
if load_imb <= self.__target_imbalance:
self._logger.info(
f"Reached target imbalance of {self.__target_imbalance} after {i + 1} iterations.")
f"Reached target load imbalance of {self.__target_imbalance:.6g} after {i + 1} iterations.")
break

# Report final mapping in debug mode
Expand Down
47 changes: 34 additions & 13 deletions src/lbaf/Execution/lbsPhaseSpecification.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,29 @@
class TaskSpecification(TypedDict):
# The task time
time: float
# The optional collection id
# The task's sequential ID
seq_id: int
# Whether the task is migratable or not
migratable: bool
# The task's home rank
home: int
# The task's current node
node: int
# The collection id
collection_id: NotRequired[int]
# User-defined parameters
user_defined: NotRequired[dict]


class SharedBlockSpecification(TypedDict):
# The shared block size
size: float
# The ID of the unique rank to which a shared block ultimately belong
home: int
home_rank: int
# The set of tasks accessing this shared block
tasks: Set[int]
# the shared block ID
shared_id: int

CommunicationSpecification = TypedDict('CommunicationSpecification', {
'size': float,
Expand All @@ -72,6 +84,8 @@ class SharedBlockSpecification(TypedDict):
class RankSpecification(TypedDict):
# The task ids
tasks: Set[int]
id: int
user_defined: dict

class PhaseSpecification(TypedDict):
"""Dictionary representing a phase specification"""
Expand All @@ -97,6 +111,9 @@ class PhaseSpecification(TypedDict):
# Rank distributions
ranks: Dict[int,RankSpecification] # where index = rank id

# Phase id
id: int

class PhaseSpecificationNormalizer:
"""
Provides normalization and denormalization for PhaseSpecification
Expand Down Expand Up @@ -156,27 +173,29 @@ def denormalize(self, data: dict)-> PhaseSpecification:
- `data.ranks.tasks`
- `data.ranks.communications`

This method should be called after json or yaml deserialization.
This method should be called after JSON or YAML deserialization.
This is the reverse implementation of the normalize method.
"""
def dict_merge(a, b):
a.update(b)
return a

return PhaseSpecification({
"tasks": self.__normalize_member(
data.get("tasks", []),
lambda t: TaskSpecification(dict_merge(
{ "time": t.get("time", 0.0) },
{ "collection_id": t.get("collection_id", None)} if "collection_id" in t else {}
))
lambda t: TaskSpecification({
"time": t.get("time", 0.0),
"seq_id": t.get("seq_id", None),
"migratable": t.get("migratable", True),
"home": t.get("home") if "home" in t else {},
"node": t.get("node") if "node" in t else {},
"collection_id": t.get("collection_id") if "collection_id" in t else {},
"user_defined": t.get("user_defined") if "user_defined" in t else {}
})
),
"shared_blocks": self.__normalize_member(
data.get("shared_blocks", []),
lambda b: SharedBlockSpecification({
"size": b.get("size", 0.0),
"tasks": set(b.get("tasks", {})),
"home_rank": b.get("home_rank")
"home_rank": b.get("home_rank"),
"shared_id": b.get("shared_id")
})
),
"communications": self.__normalize_member(
Expand All @@ -186,7 +205,9 @@ def dict_merge(a, b):
"ranks": self.__normalize_member(
data.get("ranks", []),
lambda r: RankSpecification({
"tasks": set(r.get("tasks", []))
"tasks": set(r.get("tasks", [])),
"id": set(r.get("id", [])),
"user_defined": set(r.get("user_defined", {}) if "user_defined" in r else {})
})
)
})
Loading
Loading