diff --git a/config/challenging-toy-fewer-tasks.yaml b/config/challenging-toy-fewer-tasks.yaml index 0a37b5e3..3c6d0383 100644 --- a/config/challenging-toy-fewer-tasks.yaml +++ b/config/challenging-toy-fewer-tasks.yaml @@ -12,6 +12,7 @@ work_model: parameters: beta: 0.0 gamma: 0.0 + delta: 0.0 upper_bounds: max_memory_usage: 8.0e+9 diff --git a/config/challenging-toy-hundreds-tasks.yaml b/config/challenging-toy-hundreds-tasks.yaml index c1315366..a58cc83a 100644 --- a/config/challenging-toy-hundreds-tasks.yaml +++ b/config/challenging-toy-hundreds-tasks.yaml @@ -12,6 +12,7 @@ work_model: parameters: beta: 0.0 gamma: 0.0 + delta: 0.0 upper_bounds: max_memory_usage: 8000000000.0 @@ -20,12 +21,12 @@ algorithm: name: InformAndTransfer phase_id: 0 parameters: - n_iterations: 8 - n_rounds: 4 + n_iterations: 4 + n_rounds: 3 fanout: 4 order_strategy: arbitrary transfer_strategy: Clustering - max_subclusters: 10 + max_subclusters: 0 cluster_swap_rtol: 0.05 criterion: Tempered max_objects_per_transfer: 500 diff --git a/config/conf.yaml b/config/conf.yaml index 8b823ac2..8da10c4c 100644 --- a/config/conf.yaml +++ b/config/conf.yaml @@ -11,6 +11,7 @@ work_model: parameters: beta: 0.0 gamma: 0.0 + delta: 0.0 # Specify algorithm algorithm: diff --git a/config/synthetic-blocks.yaml b/config/synthetic-blocks.yaml index fac6b849..daa70440 100644 --- a/config/synthetic-blocks.yaml +++ b/config/synthetic-blocks.yaml @@ -12,6 +12,7 @@ work_model: parameters: beta: 0.0 gamma: 0.0 + delta: 0.1 upper_bounds: max_memory_usage: 45.0 @@ -44,7 +45,7 @@ visualization: y_ranks: 2 z_ranks: 1 object_jitter: 0.5 - rank_qoi: load + rank_qoi: homing object_qoi: shared_id save_meshes: true force_continuous_object_qoi: true diff --git a/docs/pages/configuration.rst b/docs/pages/configuration.rst index b0f38f27..683878d6 100644 --- a/docs/pages/configuration.rst +++ b/docs/pages/configuration.rst @@ -76,6 +76,7 @@ Example configuration parameters: beta: 0. gamma: 0. + delta: 0. # Specify balancing algorithm algorithm: diff --git a/docs/pages/testing.rst b/docs/pages/testing.rst index 70de1524..e5b123e4 100644 --- a/docs/pages/testing.rst +++ b/docs/pages/testing.rst @@ -70,6 +70,7 @@ Synthetic Blocks Test Configuration parameters: beta: 0. gamma: 0. + delta: 0. # Specify balancing algorithm algorithm: diff --git a/src/lbaf/Applications/LBAF_app.py b/src/lbaf/Applications/LBAF_app.py index 528a36ce..e257e91c 100644 --- a/src/lbaf/Applications/LBAF_app.py +++ b/src/lbaf/Applications/LBAF_app.py @@ -390,6 +390,11 @@ def __print_statistics(self, phase: Phase, phase_name: str, work_model: WorkMode f"{phase_name} node maximum memory usage", self.__logger) if r_shared_mem_stats.get_maximum(): + lbstats.print_function_statistics( + phase.get_ranks(), + lambda x: x.get_homing(), + f"{phase_name} homing cost", + self.__logger) lbstats.print_function_statistics( phase.get_ranks(), lambda x: x.get_homed_blocks_ratio(), diff --git a/src/lbaf/Execution/lbsAlgorithmBase.py b/src/lbaf/Execution/lbsAlgorithmBase.py index d7a19191..83c25676 100644 --- a/src/lbaf/Execution/lbsAlgorithmBase.py +++ b/src/lbaf/Execution/lbsAlgorithmBase.py @@ -99,6 +99,8 @@ def __init__(self, work_model: WorkModelBase, parameters: dict, logger: Logger): self.__statistics = { ("ranks", lambda x: x.get_load()): { "maximum load": "maximum"}, + ("ranks", lambda x: self._work_model.compute(x)): { + "maximum work": "maximum"}, ("ranks", lambda x: self._work_model.compute(x)): { "total work": "sum"}} diff --git a/src/lbaf/Execution/lbsBruteForceAlgorithm.py b/src/lbaf/Execution/lbsBruteForceAlgorithm.py index fff32a94..447ac9d4 100644 --- a/src/lbaf/Execution/lbsBruteForceAlgorithm.py +++ b/src/lbaf/Execution/lbsBruteForceAlgorithm.py @@ -76,12 +76,13 @@ def execute(self, p_id: int, phases: list, statistics: dict): n_ranks = len(phase_ranks) affine_combination = isinstance( self._work_model, AffineCombinationWorkModel) - alpha, beta, gamma = [ + alpha, beta, gamma, delta = [ self._work_model.get_alpha() if affine_combination else 1.0, self._work_model.get_beta() if affine_combination else 0.0, - self._work_model.get_gamma() if affine_combination else 0.0] + self._work_model.get_gamma() if affine_combination else 0.0, + self._work_model.get_delta() if affine_combination else 0.0] _n_a, _w_min_max, a_min_max = compute_min_max_arrangements_work( - objects, alpha, beta, gamma, n_ranks, + objects, alpha, beta, gamma, delta, n_ranks, logger=self._logger) # Skip object transfers when requested diff --git a/src/lbaf/Execution/lbsClusteringTransferStrategy.py b/src/lbaf/Execution/lbsClusteringTransferStrategy.py index a10188d7..3e3ea432 100644 --- a/src/lbaf/Execution/lbsClusteringTransferStrategy.py +++ b/src/lbaf/Execution/lbsClusteringTransferStrategy.py @@ -76,10 +76,10 @@ def __init__(self, criterion, parameters: dict, lgr: Logger): self._logger.info( f"Percentage of maximum load required for subclustering: {self.__subclustering_threshold}") - # Initialize fraction of local imbalance that must be resolved by subcluster + # Initialize fraction of local imbalance to be resolved by subcluster self.__subclustering_minimum_improvement = parameters.get("subclustering_minimum_improvement", 0.0) self._logger.info( - "Fraction of local imbalance that must be resolved by subcluster: " + "Local imbalance fraction to be resolved by subcluster: " f"{self.__subclustering_minimum_improvement}") # Initialize cluster swap relative threshold @@ -201,7 +201,7 @@ def __swap_clusters(self, phase: Phase, r_src: Rank, clusters_src:dict, targets: self._n_rejects += len(o_src) + len(o_try) # Return number of swaps performed from rank - n_rank_swaps = 0 + return n_rank_swaps def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float) -> None: """Perform feasible subcluster transfers from given rank to possible targets.""" @@ -257,16 +257,13 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float): rank_targets = self._get_ranks_to_traverse(phase.get_ranks(), known_peers) # Iterate over ranks + n_ranks = len(phase.get_ranks()) for r_src, targets in rank_targets.items(): # Cluster migratable objects on source rank clusters_src = self.__build_rank_clusters(r_src, True) self._logger.debug( f"Constructed {len(clusters_src)} migratable clusters on source rank {r_src.get_id()}") - # Skip subclustering for this rank when it must be done later - if self.__separate_subclustering: - continue - # Perform feasible cluster swaps from given rank to possible targets if (n_rank_swaps := self.__swap_clusters(phase, r_src, clusters_src, targets)): # Report on swaps when some occurred @@ -280,30 +277,36 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float): continue # Perform feasible subcluster swaps from given rank to possible targets - if self.__max_subclusters > 0: - self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) + if not self.__separate_subclustering: + if self.__max_subclusters > 0: + self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) + else: + self.__n_sub_skipped += 1 # Report on new load and exit from rank self._logger.debug( f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers") # Perform subclustering when it was not previously done - if self.__max_subclusters > 0 and self.__separate_subclustering: - # In non-deterministic case skip subclustering when swaps passed - if self.__n_swaps and not self._deterministic_transfer: - self.__n_sub_skipped += len(rank_targets) + if self.__separate_subclustering: + if self.__max_subclusters > 0: + # In non-deterministic case skip subclustering when swaps passed + if self.__n_swaps and not self._deterministic_transfer: + self.__n_sub_skipped = n_ranks + else: + # Iterate over ranks + for r_src, targets in rank_targets.items(): + # Perform feasible subcluster swaps from given rank to possible targets + self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) + + # Report on new load and exit from rank + self._logger.debug( + f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers") else: - # Iterate over ranks - for r_src, targets in rank_targets.items(): - # Perform feasible subcluster swaps from given rank to possible targets - self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) - - # Report on new load and exit from rank - self._logger.debug( - f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers") + # Subclustering is skipped altogether for all ranks + self.__n_sub_skipped = n_ranks # Report on global transfer statistics - n_ranks = len(phase.get_ranks()) self._logger.info( f"Swapped {self.__n_swaps} cluster pairs amongst {self.__n_swap_tries} tries " f"({100 * self.__n_swaps / self.__n_swap_tries:.2f}%)") diff --git a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py index bbca68e4..368041a2 100644 --- a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py +++ b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py @@ -50,7 +50,7 @@ from ..Model.lbsRank import Rank from ..Model.lbsMessage import Message from ..Model.lbsPhase import Phase -from ..IO.lbsStatistics import min_Hamming_distance, print_function_statistics +from ..IO.lbsStatistics import compute_function_statistics class InformAndTransferAlgorithm(AlgorithmBase): @@ -210,30 +210,33 @@ def __execute_information_stage(self): self._logger.debug( f"Peers known to rank {rank.get_id()}: {[r_k.get_id() for r_k in k_p]}") - # Report on final known information ratio - n_k = sum(len(k_p) for k_p in self.__known_peers.values() if k_p) / n_r + # Compute and report on final known information ratio + if n_r > 1: + # Knowledge ratios can be computed + sum_kappa = 0.0 + known_fac = 1.0 / (n_r - 1.0) + for rank, peers in self.__known_peers.items(): + kappa = known_fac * (len(peers) - 1) + rank.set_kappa(kappa) + sum_kappa += kappa + else: + self._logger.warning( + f"Cannot compute knowledge ratio with only {n_r} ranks") self._logger.info( - f"Average number of peers known to ranks: {n_k} ({100 * n_k / n_r:.2f}% of {n_r})") + f"Average rank knowledge ratio: {sum_kappa / n_r:.4g}") def execute(self, p_id: int, phases: list, statistics: dict): """ Execute 2-phase information+transfer algorithm on Phase with index p_id.""" # Perform pre-execution checks and initializations self._initialize(p_id, phases, statistics) - print_function_statistics( - self._rebalanced_phase.get_ranks(), - self._work_model.compute, - "initial rank work", - self._logger) # Set phase to be used by transfer criterion self.__transfer_criterion.set_phase(self._rebalanced_phase) - # Retrieve total work from computed statistics - total_work = statistics["total work"][-1] - # Perform requested number of load-balancing iterations + s_name = "maximum work" for i in range(self.__n_iterations): - self._logger.info(f"Starting iteration {i + 1} with total work of {total_work}") + self._logger.info(f"Starting iteration {i + 1} with {s_name} of {statistics[s_name][-1]:.6g}") # Time the duration of each iteration start_time = time.time() @@ -256,12 +259,15 @@ def execute(self, p_id: int, phases: list, statistics: dict): self._logger.info( f"Iteration {i + 1} completed ({n_ignored} skipped ranks) in {time.time() - start_time:.3f} seconds") - # Compute and report iteration work statistics - stats = print_function_statistics( + # Compute and report iteration load imbalance and maximum work + load_imb = compute_function_statistics( + self._rebalanced_phase.get_ranks(), + lambda x: x.get_load()).get_imbalance() + self._logger.info(f"\trank load imbalance: {load_imb:.6g}") + max_work = compute_function_statistics( self._rebalanced_phase.get_ranks(), - self._work_model.compute, - f"iteration {i + 1} rank work", - self._logger) + self._work_model.compute).get_maximum() + self._logger.info(f"\tmaximum rank work: {max_work:.6g}") # Update run statistics self._update_statistics(statistics) @@ -273,9 +279,9 @@ def execute(self, p_id: int, phases: list, statistics: dict): self._initial_phase.get_lb_iterations().append(lb_iteration) # Check if the current imbalance is within the target_imbalance range - if stats.statistics["imbalance"] <= self.__target_imbalance: + if load_imb <= self.__target_imbalance: self._logger.info( - f"Reached target imbalance of {self.__target_imbalance} after {i + 1} iterations.") + f"Reached target load imbalance of {self.__target_imbalance:.6g} after {i + 1} iterations.") break # Report final mapping in debug mode diff --git a/src/lbaf/Execution/lbsPhaseSpecification.py b/src/lbaf/Execution/lbsPhaseSpecification.py index 779ad753..25eaecdb 100644 --- a/src/lbaf/Execution/lbsPhaseSpecification.py +++ b/src/lbaf/Execution/lbsPhaseSpecification.py @@ -51,17 +51,29 @@ class TaskSpecification(TypedDict): # The task time time: float - # The optional collection id + # The task's sequential ID + seq_id: int + # Whether the task is migratable or not + migratable: bool + # The task's home rank + home: int + # The task's current node + node: int + # The collection id collection_id: NotRequired[int] + # User-defined parameters + user_defined: NotRequired[dict] class SharedBlockSpecification(TypedDict): # The shared block size size: float # The ID of the unique rank to which a shared block ultimately belong - home: int + home_rank: int # The set of tasks accessing this shared block tasks: Set[int] + # the shared block ID + shared_id: int CommunicationSpecification = TypedDict('CommunicationSpecification', { 'size': float, @@ -72,6 +84,8 @@ class SharedBlockSpecification(TypedDict): class RankSpecification(TypedDict): # The task ids tasks: Set[int] + id: int + user_defined: dict class PhaseSpecification(TypedDict): """Dictionary representing a phase specification""" @@ -97,6 +111,9 @@ class PhaseSpecification(TypedDict): # Rank distributions ranks: Dict[int,RankSpecification] # where index = rank id + # Phase id + id: int + class PhaseSpecificationNormalizer: """ Provides normalization and denormalization for PhaseSpecification @@ -156,27 +173,29 @@ def denormalize(self, data: dict)-> PhaseSpecification: - `data.ranks.tasks` - `data.ranks.communications` - This method should be called after json or yaml deserialization. + This method should be called after JSON or YAML deserialization. This is the reverse implementation of the normalize method. """ - def dict_merge(a, b): - a.update(b) - return a - return PhaseSpecification({ "tasks": self.__normalize_member( data.get("tasks", []), - lambda t: TaskSpecification(dict_merge( - { "time": t.get("time", 0.0) }, - { "collection_id": t.get("collection_id", None)} if "collection_id" in t else {} - )) + lambda t: TaskSpecification({ + "time": t.get("time", 0.0), + "seq_id": t.get("seq_id", None), + "migratable": t.get("migratable", True), + "home": t.get("home") if "home" in t else {}, + "node": t.get("node") if "node" in t else {}, + "collection_id": t.get("collection_id") if "collection_id" in t else {}, + "user_defined": t.get("user_defined") if "user_defined" in t else {} + }) ), "shared_blocks": self.__normalize_member( data.get("shared_blocks", []), lambda b: SharedBlockSpecification({ "size": b.get("size", 0.0), "tasks": set(b.get("tasks", {})), - "home_rank": b.get("home_rank") + "home_rank": b.get("home_rank"), + "shared_id": b.get("shared_id") }) ), "communications": self.__normalize_member( @@ -186,7 +205,9 @@ def dict_merge(a, b): "ranks": self.__normalize_member( data.get("ranks", []), lambda r: RankSpecification({ - "tasks": set(r.get("tasks", [])) + "tasks": set(r.get("tasks", [])), + "id": set(r.get("id", [])), + "user_defined": set(r.get("user_defined", {}) if "user_defined" in r else {}) }) ) }) diff --git a/src/lbaf/Execution/lbsTransferStrategyBase.py b/src/lbaf/Execution/lbsTransferStrategyBase.py index dc3ef770..a589f2da 100644 --- a/src/lbaf/Execution/lbsTransferStrategyBase.py +++ b/src/lbaf/Execution/lbsTransferStrategyBase.py @@ -92,8 +92,8 @@ def _initialize_transfer_stage(self, ave_load: float): """Initialize transfer stage consistently across strategies.""" # Keep track of average load + self._logger.info(f"Executing transfer phase with average load: {ave_load}") self._average_load = ave_load - self._logger.info(f"Executing transfer phase with average load: {self._average_load}") # Initialize numbers of transfers and rejects self._n_transfers = 0 diff --git a/src/lbaf/IO/lbsConfigurationValidator.py b/src/lbaf/IO/lbsConfigurationValidator.py index 78c5ad4e..c91c8701 100644 --- a/src/lbaf/IO/lbsConfigurationValidator.py +++ b/src/lbaf/IO/lbsConfigurationValidator.py @@ -92,6 +92,7 @@ def __init__(self, config_to_validate: dict, logger: Logger): "parameters": { "beta": float, "gamma": float, + Optional("delta"): float, Optional("upper_bounds"): And( dict, lambda x: all(isinstance(y, float) for y in x.values()))}}, diff --git a/src/lbaf/IO/lbsStatistics.py b/src/lbaf/IO/lbsStatistics.py index e7bba552..629c3745 100644 --- a/src/lbaf/IO/lbsStatistics.py +++ b/src/lbaf/IO/lbsStatistics.py @@ -214,7 +214,10 @@ def compute_load(objects: tuple, rank_object_ids: list) -> float: return sum(objects[i].get_load() for i in rank_object_ids) -def compute_arrangement_works(objects: tuple, arrangement: tuple, alpha: float, beta: float, gamma: float) -> dict: +def compute_arrangement_works( + objects: tuple, arrangement: tuple, + alpha: float, beta: float, gamma: float, delta: float, + logger: Optional[Logger] = None) -> dict: """Return a dictionary with works of rank objects.""" # Build object rank map from arrangement ranks = {} @@ -229,8 +232,14 @@ def compute_arrangement_works(objects: tuple, arrangement: tuple, alpha: float, # Do not calculate communications if beta = 0 for better performance if beta > 0.0: - works[rank] += beta * max(compute_volume(objects, rank_objects, "received"), - compute_volume(objects, rank_objects, "sent")) + works[rank] += beta * max( + compute_volume(objects, rank_objects, "received"), + compute_volume(objects, rank_objects, "sent")) + + # Homing cost not calculated yet + if delta > 0.0 and logger is not None: + logger.error("Delta homing cost not calculated yet") + raise SystemExit(1) # Add constant works[rank] += gamma @@ -239,8 +248,11 @@ def compute_arrangement_works(objects: tuple, arrangement: tuple, alpha: float, return works -def compute_min_max_arrangements_work(objects: tuple, alpha: float, beta: float, gamma: float, n_ranks: int, - sanity_checks=True, logger: Optional[Logger] = None): +def compute_min_max_arrangements_work( + objects: tuple, + alpha: float, beta: float, gamma: float, delta: float, + n_ranks: int, + sanity_checks=True, logger: Optional[Logger] = None): """Compute all possible arrangements with repetition and minimax work.""" # Initialize quantities of interest n_arrangements = 0 @@ -248,7 +260,8 @@ def compute_min_max_arrangements_work(objects: tuple, alpha: float, beta: float, arrangements_min_max = [] for arrangement in itertools.product(range(n_ranks), repeat=len(objects)): # Compute per-rank works for current arrangement - works = compute_arrangement_works(objects, arrangement, alpha, beta, gamma) + works = compute_arrangement_works( + objects, arrangement, alpha, beta, gamma, delta) # Update minmax when relevant work_max = max(works.values()) @@ -281,7 +294,8 @@ def compute_min_max_arrangements_work(objects: tuple, alpha: float, beta: float, def compute_pairwise_reachable_arrangements( - objects: tuple, arrangement: tuple, alpha: float, beta: float, gamma: float, + objects: tuple, arrangement: tuple, + alpha: float, beta: float, gamma: float, delta: float, w_max: float, from_id: int, to_id: int, n_ranks: int, max_objects: Optional[int] = None, logger: Optional[Logger] = None): """Compute arrangements reachable by moving up to a given maximum number of objects.""" @@ -312,8 +326,10 @@ def compute_pairwise_reachable_arrangements( for c in itertools.combinations(matches, n): # Change all corresponding entries n_possible += 1 - new_arrangement = tuple(to_id if i in c else r for i, r in enumerate(arrangement)) - works = compute_arrangement_works(objects, new_arrangement, alpha, beta, gamma) + new_arrangement = tuple( + to_id if i in c else r for i, r in enumerate(arrangement)) + works = compute_arrangement_works( + objects, new_arrangement, alpha, beta, gamma, delta) # Check whether new arrangements is reachable w_max_new = max(works.values()) @@ -441,8 +457,10 @@ def print_subset_statistics(subset_name, subset_size, set_name, set_size, logger ss = f"{100. * subset_size / set_size:.3g}" if set_size else '' logger.info(f"{subset_name}: {subset_size:.6g} amongst {set_size:.6g} {set_name} ({ss}%)") -def compute_all_reachable_arrangements(objects: tuple, arrangement: tuple, alpha: float, beta: float, gamma: float, - w_max: float, n_ranks: int, logger: Logger, max_objects: int = None): +def compute_all_reachable_arrangements( + objects: tuple, arrangement: tuple, + alpha: float, beta: float, gamma: float, delta: float, + w_max: float, n_ranks: int, logger: Logger, max_objects: int = None): """Compute all arrangements reachable by moving up to a maximum number of objects.""" logger.warning("Old function. Used by the removed Rank Object Enumerator script") @@ -456,8 +474,10 @@ def compute_all_reachable_arrangements(objects: tuple, arrangement: tuple, alpha for to_id in range(n_ranks): if from_id == to_id: continue - reachable.update(compute_pairwise_reachable_arrangements(objects, arrangement, alpha, beta, gamma, w_max, - from_id, to_id, n_ranks, max_objects, logger)) + reachable.update(compute_pairwise_reachable_arrangements( + objects, arrangement, + alpha, beta, gamma, delta, + w_max,from_id, to_id, n_ranks, max_objects, logger)) logger.info( f"Found {len(reachable)} reachable arrangements, with maximum work: {max(reachable.values())}:") for k, v in reachable.items(): @@ -467,9 +487,11 @@ def compute_all_reachable_arrangements(objects: tuple, arrangement: tuple, alpha return reachable -def recursively_compute_transitions(stack: list, visited: dict, objects: tuple, arrangement: tuple, alpha: float, - beta: float, gamma: float, w_max: float, w_min_max: float, n_ranks: int, - logger: Logger, max_objects: Optional[int] = None): +def recursively_compute_transitions( + stack: list, visited: dict, objects: tuple, arrangement: tuple, + alpha: float, beta: float, gamma: float, delta: float, + w_max: float, w_min_max: float, n_ranks: int, + logger: Logger, max_objects: Optional[int] = None): """Recursively compute all possible transitions to reachable arrangements from initial one.""" logger.warning("Old function. Used by the removed Rank Object Enumerator script") @@ -494,7 +516,7 @@ def recursively_compute_transitions(stack: list, visited: dict, objects: tuple, reachable = compute_all_reachable_arrangements( objects, arrangement, - alpha, beta, gamma, + alpha, beta, gamma, delta, w_max, n_ranks, max_objects) @@ -512,7 +534,7 @@ def recursively_compute_transitions(stack: list, visited: dict, objects: tuple, visited, objects, k, - alpha, beta, gamma, + alpha, beta, gamma, delta, w_max, w_min_max, n_ranks, max_objects) diff --git a/src/lbaf/Model/lbsAffineCombinationWorkModel.py b/src/lbaf/Model/lbsAffineCombinationWorkModel.py index 57d35bb3..6bf953c3 100644 --- a/src/lbaf/Model/lbsAffineCombinationWorkModel.py +++ b/src/lbaf/Model/lbsAffineCombinationWorkModel.py @@ -54,7 +54,7 @@ class AffineCombinationWorkModel(WorkModelBase): def __init__(self, parameters, lgr: Logger): """Class constructor: - parameters: dictionary with alpha, beta, and gamma values. + parameters: dictionary with alpha, beta, gamma and delta values. """ # Assign logger to instance variable self.__logger = lgr @@ -62,6 +62,7 @@ def __init__(self, parameters, lgr: Logger): # Use default values if parameters not provided self.__beta = parameters.get("beta", 0.0) self.__gamma = parameters.get("gamma", 0.0) + self.__delta = parameters.get("delta", 0.0) self.__upper_bounds = parameters.get("upper_bounds", {}) self.__node_bounds = parameters.get("node_bounds", False) @@ -69,7 +70,7 @@ def __init__(self, parameters, lgr: Logger): super().__init__(parameters) self.__logger.info( "Instantiated work model with: " - f"beta={self.__beta}, gamma={self.__gamma}") + f"beta={self.__beta}, gamma={self.__gamma}, delta={self.__delta}") for k, v in self.__upper_bounds.items(): self.__logger.info( f"Upper bound for {'node' if self.__node_bounds else 'rank'} {k}: {v}") @@ -82,14 +83,18 @@ def get_gamma(self): """Get the gamma parameter.""" return self.__gamma - def affine_combination(self, a, l, v1, v2): - """Compute affine combination of load and maximum volume.""" - return a * l + self.__beta * max(v1, v2) + self.__gamma + def get_delta(self): + """Get the delta parameter.""" + return self.__delta + + def affine_combination(self, a, l, v1, v2, h): + """Compute affine combination of load, maximum volume, and homing cost.""" + return a * l + self.__beta * max(v1, v2) + self.__gamma + self.__delta * h def compute(self, rank: Rank): """A work model with affine combination of load and communication. - alpha * load + beta * max(sent, received) + gamma, + alpha * load + beta * max(sent, received) + gamma + delta * homing, under optional strict upper bounds. """ # Check whether strict bounds are satisfied @@ -104,4 +109,5 @@ def compute(self, rank: Rank): rank.get_alpha(), rank.get_load(), rank.get_received_volume(), - rank.get_sent_volume()) + rank.get_sent_volume(), + rank.get_homing()) diff --git a/src/lbaf/Model/lbsPhase.py b/src/lbaf/Model/lbsPhase.py index 8c2792b6..02735e73 100644 --- a/src/lbaf/Model/lbsPhase.py +++ b/src/lbaf/Model/lbsPhase.py @@ -457,7 +457,8 @@ def populate_from_samplers(self, n_ranks, n_objects, t_sampler, v_sampler, c_deg raise SystemExit(1) # Compute and report communication volume statistics - print_function_statistics(v_sent, lambda x: x, "communication volumes", self.__logger) + print_function_statistics( + v_sent, lambda x: x, "communication volumes", self.__logger) # Create given number of ranks self.__ranks = [Rank(self.__logger, r_id) for r_id in range(n_ranks)] diff --git a/src/lbaf/Model/lbsRank.py b/src/lbaf/Model/lbsRank.py index 5c835258..117efb0f 100644 --- a/src/lbaf/Model/lbsRank.py +++ b/src/lbaf/Model/lbsRank.py @@ -82,6 +82,9 @@ def __init__( # Start with empty metadata self.__metadata = {} + # Initialize knowledge ratio to complete ignorance + self.__kappa = 0.0 + # By default the rank is note connected to a node self.__node = None @@ -130,6 +133,19 @@ def set_size(self, size): f"size: incorrect type {type(size)} or value: {size}") self.__size = float(size) + @qoi + def get_kappa(self) -> float: + """Return rank knowledge ratio.""" + return self.__kappa + + def set_kappa(self, kappa): + """Set rank knowledge ratio, which is algorithm-specific.""" + # Value in [0;1] is required + if not isinstance(kappa, float) or kappa < 0.0 or kappa > 1.0: + raise TypeError( + f"kappa: incorrect type {type(kappa)} or value: {kappa}") + self.__kappa = kappa + def get_metadata(self) -> dict: """Return original metadata.""" return self.__metadata @@ -165,6 +181,14 @@ def get_number_of_homed_blocks(self) -> float: b.get_home_id() == self.get_id() for b in self.get_shared_blocks()) + @qoi + def get_homing(self) -> float: + """Return homing cost on rank.""" + val : float = 0.0 + return val + sum( + b.get_size() for b in self.get_shared_blocks() + if self.get_id() != b.get_home_id()) + @qoi def get_number_of_uprooted_blocks(self) -> float: """Return number of uprooted memory blocks on rank.""" @@ -269,22 +293,21 @@ def get_alpha(self) -> float: def get_load(self) -> float: """Return total load on rank.""" val : float = 0.0 - val += sum(o.get_load() for o in self.__migratable_objects.union(self.__sentinel_objects)) - return val + return val + sum( + o.get_load() + for o in self.__migratable_objects.union(self.__sentinel_objects)) @qoi def get_migratable_load(self) -> float: """Return migratable load on rank.""" val : float = 0.0 - val += sum(o.get_load() for o in self.__migratable_objects) - return val + return val + sum(o.get_load() for o in self.__migratable_objects) @qoi def get_sentinel_load(self) -> float: """Return sentinel load on rank.""" val : float = 0.0 - val += sum(o.get_load() for o in self.__sentinel_objects) - return val + return val + sum(o.get_load() for o in self.__sentinel_objects) @qoi def get_received_volume(self) -> float: @@ -298,7 +321,9 @@ def get_received_volume(self) -> float: continue # Add total volume received from non-local objects - volume += sum(v for k, v in o.get_communicator().get_received().items() if k not in obj_set) + volume += sum( + v for k, v in o.get_communicator().get_received().items() + if k not in obj_set) # Return computed volume return volume diff --git a/src/lbaf/Utils/lbsJSONDataFilesMaker.py b/src/lbaf/Utils/lbsJSONDataFilesMaker.py index 5bce0a08..c9d10bac 100644 --- a/src/lbaf/Utils/lbsJSONDataFilesMaker.py +++ b/src/lbaf/Utils/lbsJSONDataFilesMaker.py @@ -57,7 +57,7 @@ --data-stem=/home/john/data-maker/dataset1 ``` -- Generate dataset from specification file and sample configuration file configured to use the generated data stem +- Generate dataset from specification file and Python configuration file configured to use the generated data stem ``` lbaf-vt-data-files-maker \ @@ -69,10 +69,6 @@ - Generate dataset from specification defined interactively in CLI `lbaf-vt-data-files-maker --interactive` -Sample specification: a sample specification can be loaded in the interactive mode and be printed as an example in -either YAML or JSON format. -Other examples can be found as unit tests configuration files in the the tests/unit/config/phases directory - """ from argparse import RawTextHelpFormatter from datetime import datetime @@ -193,7 +189,7 @@ def __init__(self, logger: Optional[Logger] = None): allow_abbrev=False, description=( "Utility to generate a data set supporting shared blocks by using a specification file.\n" + - "Note: a sample specification can be loaded in the interactive mode and be printed\n" + + "Note: a Python specification can be loaded in the interactive mode and be printed\n" + "as an example in either YAML or JSON format."), prompt_default=False, formatter_class=RawTextHelpFormatter @@ -282,70 +278,6 @@ def build(self): self.__args.data_stem = data_stem - def load_sample(self, use_explicit_keys: bool = False): - """Create a new sample specification as represented by diagram specified in issue #506 - This method implementation indicates also how to create a specification from Python code - """ - - spec = PhaseSpecification({ - "tasks": [ - TaskSpecification({ - "collection_id": 0, - "time": 2.0 - }), - TaskSpecification({ - "collection_id": 0, - "time": 3.5 - }), - TaskSpecification({ - "collection_id": 0, - "time": 5.0 - }) - ], - "communications": [ - CommunicationSpecification({ - "size": 10000.0, # c1 (size) - "from": 0, # from t1 - "to": 2 # to t3 - }), - CommunicationSpecification({ - "size": 15000.0, # c2 (size) - "from": 1, # from t2 - "to": 2 # to t3 - }), - CommunicationSpecification({ - "size": 20000.0, # c3 (size) - "from": 2, # from t3 - "to": 1 # to t2 - }) - ], - "shared_blocks": [ - # S1 - SharedBlockSpecification({ - "size": 10000.0, - "home_rank": 0, - "tasks": {0, 1} - }), - # S2 - SharedBlockSpecification({ - "size": 15000.0, - "home_rank": 1, - "tasks": {2} - }) - ], - "ranks": { - 0: RankSpecification({"tasks": {0, 1}}), - 1: RankSpecification({"tasks": {2}}) - } - }) - - if use_explicit_keys: - spec["tasks"] = dict(enumerate(spec["tasks"])) - spec["communications"] = dict(enumerate(spec["communications"])) - spec["shared_blocks"] = dict(enumerate(spec["shared_blocks"])) - - self.spec = spec - def load_spec_from_file(self, file_path) -> Optional[PhaseSpecificationNormalizer]: """Load a specification from a file (Yaml or Json) @@ -357,7 +289,7 @@ def load_spec_from_file(self, file_path) -> Optional[PhaseSpecificationNormalize raise FileNotFoundError("File not found") spec = PhaseSpecification() - with open(file_path, "r", encoding="utf-8") as file_stream: + with open(file_path, 'r', encoding="utf-8") as file_stream: if file_path.endswith(".json"): spec_dict = json.load(file_stream) # in json keys are strings (int not supported by the JSON format) so apply casts as needed @@ -771,10 +703,7 @@ def remove_element(self): def run_extra_action(self, action: str): """Run an extra action""" - if action == "Extra: load sample": - self.load_sample(use_explicit_keys=True) - action = "Build" - elif action == "Extra: print": + if action == "Extra: print": frmt = self.__prompt.prompt( "Format ?", choices=["yaml", "json", "python (debug)"], required=True, default="yaml") self.print(frmt) @@ -795,10 +724,19 @@ def run_extra_action(self, action: str): with open(path, "wt", encoding="utf-8") as o_file: normalized_spec = PhaseSpecificationNormalizer().normalize(self.spec) if frmt == "json": - o_file.write(json.dumps(normalized_spec, sort_keys=True, indent=2, separators=(',', ": "))) + o_file.write( + json.dumps( + normalized_spec, + sort_keys=True, + indent=2, + separators=(',', ": "))) elif frmt == "yaml": o_file.write( - yaml.dump(normalized_spec, indent=2, Dumper=YamlSpecificationDumper, default_flow_style=None)) + yaml.dump( + normalized_spec, + indent=2, + Dumper=YamlSpecificationDumper, + default_flow_style=None)) def run_action(self, action: str): """Run an action""" @@ -856,7 +794,7 @@ def run(self): self.__args.interactive = True # Loop on interactive mode available actions - action: str = "Make Task" # default action is a sample + action: str = "Make Task" # default action is a Python specticication while action != "Build JSON file": action = self.__prompt.prompt( "Choose an action ?", @@ -869,7 +807,6 @@ def run(self): "Build", "Extra: load file", "Extra: run", - "Extra: load sample", "Extra: print", "Extra: save", "Exit" diff --git a/src/lbaf/Utils/lbsJSONSpecFileMaker.py b/src/lbaf/Utils/lbsJSONSpecFileMaker.py new file mode 100644 index 00000000..1e6745cd --- /dev/null +++ b/src/lbaf/Utils/lbsJSONSpecFileMaker.py @@ -0,0 +1,224 @@ +import os +import yaml +from typing import Set, List, Union + +from lbaf.Execution.lbsPhaseSpecification import ( + PhaseSpecification, SharedBlockSpecification, + RankSpecification, TaskSpecification, + PhaseSpecificationNormalizer +) + +class JSONSpecFileMaker: + + ########################################################################### + ## Constructor + + def __init__(self): + self.tasks = {} + self.ranks = {} + self.comms = {} + self.shared_blocks = {} + + self.assignments = {} # { task_id: rank_id } + + self.current_ids = { + "seq": 0, + "shared": 0, + "rank": 0, + "phase": 0, + } + + self.id_sets = { + "seq": set(), + "shared": set(), + "rank": set(), + "phase": set(), + } + + + ########################################################################### + ## Private generators + + def generateNextID_(self, key: str) -> int: + """ + Generic function to generate and update the next available ID for a given category. + """ + if key not in self.current_ids or key not in self.id_sets: + raise ValueError(f"Invalid key: {key}") + + while self.current_ids[key] in self.id_sets[key]: + self.current_ids[key] += 1 + + next_id = self.current_ids[key] + self.current_ids[key] += 1 + return next_id + + def checkID_(self, try_id: int, key: str) -> int: + if try_id < 0: + try_id = self.generateNextID_(key) + elif id in self.id_sets[key]: + return self.checkID_(-1, key) + self.id_sets[key].add(try_id) + return try_id + + ########################################################################### + ## Private functions for assertions + + def assertIDExists_(self, test_id: int, id_type: str): + assert test_id in self.id_sets[id_type], \ + f"Task {test_id} has not been created yet. Use createTask() method." + + def assertAllTasksHaveBeenAssigned_(self): + for t in self.tasks: + assert t in self.assignments, \ + f"Task {t} has not been assigned. Call " \ + f"assignTask({t}, )" + + + + ########################################################################### + ## Public Functions for creating JSON fields + + def createTask(self, + time: float, + seq_id: int = -1, + collection_id: int = 7, + migratable: bool = True, + home = -1, + node = -1, + user_defined: dict = None) -> TaskSpecification: + seq_id = self.checkID_(seq_id, "seq") + task = TaskSpecification({ + "time": time, + "seq_id": seq_id, + "collection_id": collection_id, + "migratable": migratable, + "home": home, + "node": home if node == -1 else node, + }) + if user_defined is not None: + task["user_defined"] = user_defined + self.tasks[seq_id] = task + return task + + def createSharedBlock(self, + bytes: float, + tasks: Union[List[int], List[TaskSpecification]], + rank_id: int = -1, + id: int = -1): + shared_id = self.checkID_(id, "shared") + shared_block = SharedBlockSpecification({ + "size": bytes, + "shared_id": shared_id, + }) + if rank_id >= 0: + shared_block["home_rank"] = rank_id + # Option to assign tasks to shared block + task_specs = {} + if tasks is not None: + for t in tasks: + if isinstance(t, int): + task_specs[t] = self.tasks[t] + elif isinstance(t, dict): + task_specs[t["seq_id"]] = t + else: + raise RuntimeError( + f"tasks must be a list of ints (IDs) or TaskSpecifications") + shared_block["tasks"] = task_specs + self.shared_blocks[shared_id] = shared_block + return shared_block + + def createRank(self, + id: int = -1, + tasks: Union[List[int], List[TaskSpecification]] = None, + user_defined: dict = None + ) -> RankSpecification: + id = self.checkID_(id, "rank") + rank = RankSpecification({ + "id": id + }) + self.ranks[id] = rank + # Optionally add tasks to the rank + if tasks is not None: + for t in tasks: + self.assignTask(t, id) + + # Optionally add user_defined info to the rank + if user_defined is not None: + self.ranks[id]["user_defined"] = user_defined + + return self.ranks[id] + + def assignTask(self, + task: Union[int, TaskSpecification], + rank: Union[int, RankSpecification] + ) -> None: + """ + Both the `task` and `rank` can be passed as either of the following: + - the ID (int) + - the Task/Rank Specification (dict) + """ + # Determine the task ID (trivial if the task is given as the integer ID) + task_id = task if isinstance(task, int) else task["seq_id"] + # Determine the task spec (trivial if the task is given as the spec) + t = task if isinstance(task, dict) else self.tasks[task_id] + + # Determine the rank ID (trivial if the rank is given as the integer ID) + rank_id = rank if isinstance(rank, int) else rank["id"] + # Determine the rank spec (trivial if the rank is given as the spec) + r = rank if isinstance(rank, dict) else self.ranks[rank_id] + + self.assertIDExists_(task_id, "seq") + self.assertIDExists_(rank_id, "rank") + + # Make assignments + t["home"] = rank_id + if t["node"] == -1: + t["node"] = rank_id + if "tasks" not in r: + r["tasks"] = set() + r["tasks"].add(task_id) + + # If the task belongs to a shared_block, + # update the block's home rank as well + for sb in self.shared_blocks.values(): + if "home_rank" not in sb and task_id in sb.get("tasks", []): + sb["home_rank"] = rank_id + + # Keep track of assignment + self.assignments[task_id] = rank_id + + + ########################################################################### + ## Standard Getters + + def getRank(self, id: int) -> RankSpecification: + return self.ranks[id] + + def getTask(self, seq_id: int) -> TaskSpecification: + return self.tasks[seq_id] + + def getSharedBlock(self, id: int) -> SharedBlockSpecification: + return self.shared_blocks[id] + + ########################################################################### + ## Writer + + def write(self, path: str = None): + self.assertAllTasksHaveBeenAssigned_() + phase = PhaseSpecification({ + "tasks": self.tasks, + "shared_blocks": self.shared_blocks, + "communications": self.comms, + "ranks": self.ranks, + "id": self.checkID_(0, "phase") + }) + + norm = PhaseSpecificationNormalizer() + spec = norm.normalize(phase) + + # Write out the spec + path = os.path.join(os.getcwd(), "spec.yaml") if path is None else path + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'w') as output_file: + yaml.dump(spec, output_file, default_flow_style=False)