Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/challenging-toy-hundreds-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ algorithm:
phase_id: 0
parameters:
n_iterations: 4
n_rounds: 1
fanout: 2
n_rounds: 3
fanout: 4
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: 0
Expand Down
4 changes: 2 additions & 2 deletions src/lbaf/Execution/lbsClusteringTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from ..Model.lbsPhase import Phase


class ClusteringTransferStrategy(TransferStrategyBase):

Check notice on line 56 in src/lbaf/Execution/lbsClusteringTransferStrategy.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many instance attributes (12/7) (too-many-instance-attributes)
"""A concrete class for the clustering-based transfer strategy."""

def __init__(self, criterion, parameters: dict, lgr: Logger):
Expand All @@ -76,10 +76,10 @@
self._logger.info(
f"Percentage of maximum load required for subclustering: {self.__subclustering_threshold}")

# Initialize fraction of local imbalance that must be resolved by subcluster
# Initialize fraction of local imbalance to be resolved by subcluster
self.__subclustering_minimum_improvement = parameters.get("subclustering_minimum_improvement", 0.0)
self._logger.info(
"Fraction of local imbalance that must be resolved by subcluster: "
"Local imbalance fraction to be resolved by subcluster: "
f"{self.__subclustering_minimum_improvement}")

# Initialize cluster swap relative threshold
Expand Down Expand Up @@ -250,7 +250,7 @@
# Reject subcluster transfer
self._n_rejects += len(o_src)

def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float):

Check notice on line 253 in src/lbaf/Execution/lbsClusteringTransferStrategy.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many branches (14/12) (too-many-branches)
"""Perform object transfer stage."""
# Initialize transfer stage
self._initialize_transfer_stage(ave_load)
Expand Down Expand Up @@ -301,7 +301,7 @@

# Report on new load and exit from rank
self._logger.debug(
f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers")

Check notice on line 304 in src/lbaf/Execution/lbsClusteringTransferStrategy.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Line too long (121/120) (line-too-long)
else:
# Subclustering is skipped altogether for all ranks
self.__n_sub_skipped = n_ranks
Expand Down
40 changes: 24 additions & 16 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from ..Model.lbsRank import Rank
from ..Model.lbsMessage import Message
from ..Model.lbsPhase import Phase
from ..IO.lbsStatistics import min_Hamming_distance, print_function_statistics
from ..IO.lbsStatistics import compute_function_statistics


class InformAndTransferAlgorithm(AlgorithmBase):
Expand Down Expand Up @@ -210,20 +210,25 @@ def __execute_information_stage(self):
self._logger.debug(
f"Peers known to rank {rank.get_id()}: {[r_k.get_id() for r_k in k_p]}")

# Report on final known information ratio
n_k = sum(len(k_p) for k_p in self.__known_peers.values() if k_p) / n_r
# Compute and report on final known information ratio
if n_r > 1:
# Knowledge ratios can be computed
sum_kappa = 0.0
known_fac = 1.0 / (n_r - 1.0)
for rank, peers in self.__known_peers.items():
kappa = known_fac * (len(peers) - 1)
rank.set_kappa(kappa)
sum_kappa += kappa
else:
self._logger.warning(
f"Cannot compute knowledge ratio with only {n_r} ranks")
self._logger.info(
f"Average number of peers known to ranks: {n_k} ({100 * n_k / n_r:.2f}% of {n_r})")
f"Average rank knowledge ratio: {sum_kappa / n_r:.4g}")

def execute(self, p_id: int, phases: list, statistics: dict):
""" Execute 2-phase information+transfer algorithm on Phase with index p_id."""
# Perform pre-execution checks and initializations
self._initialize(p_id, phases, statistics)
print_function_statistics(
self._rebalanced_phase.get_ranks(),
self._work_model.compute,
"initial rank work",
self._logger)

# Set phase to be used by transfer criterion
self.__transfer_criterion.set_phase(self._rebalanced_phase)
Expand Down Expand Up @@ -256,12 +261,15 @@ def execute(self, p_id: int, phases: list, statistics: dict):
self._logger.info(
f"Iteration {i + 1} completed ({n_ignored} skipped ranks) in {time.time() - start_time:.3f} seconds")

# Compute and report iteration work statistics
stats = print_function_statistics(
# Compute and report iteration load imbalance and maximum work
load_imb = compute_function_statistics(
self._rebalanced_phase.get_ranks(),
self._work_model.compute,
f"iteration {i + 1} rank work",
self._logger)
lambda x: x.get_load()).get_imbalance()
self._logger.info(f"\trank load imbalance: {load_imb:.6g}")
max_work = compute_function_statistics(
self._rebalanced_phase.get_ranks(),
self._work_model.compute).get_maximum()
self._logger.info(f"\tmaximum rank work: {max_work:.6g}")

# Update run statistics
self._update_statistics(statistics)
Expand All @@ -273,9 +281,9 @@ def execute(self, p_id: int, phases: list, statistics: dict):
self._initial_phase.get_lb_iterations().append(lb_iteration)

# Check if the current imbalance is within the target_imbalance range
if stats.statistics["imbalance"] <= self.__target_imbalance:
if load_imb <= self.__target_imbalance:
self._logger.info(
f"Reached target imbalance of {self.__target_imbalance} after {i + 1} iterations.")
f"Reached target load imbalance of {self.__target_imbalance:.6g} after {i + 1} iterations.")
break

# Report final mapping in debug mode
Expand Down
16 changes: 16 additions & 0 deletions src/lbaf/Model/lbsRank.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def __init__(
# Start with empty metadata
self.__metadata = {}

# Initialize knowledge ratio to complete ignorance
self.__kappa = 0.0

# By default the rank is note connected to a node
self.__node = None

Expand Down Expand Up @@ -130,6 +133,19 @@ def set_size(self, size):
f"size: incorrect type {type(size)} or value: {size}")
self.__size = float(size)

@qoi
def get_kappa(self) -> float:
"""Return rank knowledge ratio."""
return self.__kappa

def set_kappa(self, kappa):
"""Set rank knowledge ratio, which is algorithm-specific."""
# Value in [0;1] is required
if not isinstance(kappa, float) or kappa < 0.0 or kappa > 1.0:
raise TypeError(
f"kappa: incorrect type {type(kappa)} or value: {kappa}")
self.__kappa = kappa

def get_metadata(self) -> dict:
"""Return original metadata."""
return self.__metadata
Expand Down
Loading