Skip to content

#586: Investigate erratic and unpredictable CI error regarding synthetic case (NEW) #590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/code-quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install tox-gh-actions
pip list

- name: Register Github Actions Problem matchers (pylint)
run: |
Expand Down
2 changes: 1 addition & 1 deletion config/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ algorithm:
order_strategy: element_id
transfer_strategy: Recursive
criterion: Tempered
max_objects_per_transfer: 8
max_objects_per_transfer: 1
deterministic_transfer: true

# Specify output
Expand Down
3 changes: 3 additions & 0 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ def __init__(self, work_model: WorkModelBase, parameters: dict, logger: Logger):
# Keep track of phase communications
self._initial_communications = {}

# Assign optional parameters
self._deterministic_transfer = parameters.get("deterministic_transfer", False)

# Map rank statistics to their respective computation methods
self.__statistics = {
("ranks", lambda x: x.get_load()): {
Expand Down
2 changes: 2 additions & 0 deletions src/lbaf/Execution/lbsCentralizedPrefixOptimizerAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def execute(self, p_id: int, phases: list, statistics: dict, _):
# Prepare input data for rank order enumerator
self._logger.info("Starting optimizer")
phase_ranks = self._phase.get_ranks()
if self._deterministic_transfer:
phase_ranks = sorted(phase_ranks, key=lambda r: r.get_id())

# Initialize max shared ID
max_shared_ids = 0
Expand Down
5 changes: 4 additions & 1 deletion src/lbaf/Execution/lbsClusteringTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float):
"""Perform object transfer stage."""
# Initialize transfer stage
self._initialize_transfer_stage(ave_load)
rank_targets = self._get_ranks_to_traverse(phase.get_ranks(), known_peers)
phase_ranks = phase.get_ranks()
if self._deterministic_transfer:
phase_ranks = sorted(phase_ranks, key=lambda r: r.get_id())
rank_targets = self._get_ranks_to_traverse(phase_ranks, known_peers)

# Iterate over ranks
for r_src, targets in rank_targets.items():
Expand Down
18 changes: 10 additions & 8 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,23 @@ def __forward_message(self, i: int, r_snd: Rank, f: int):
def __execute_information_stage(self):
"""Execute information stage."""
# Build set of all ranks in the phase
rank_set = set(self._rebalanced_phase.get_ranks())
phase_ranks = self._rebalanced_phase.get_ranks()
if self._deterministic_transfer:
phase_ranks = sorted(phase_ranks, key=lambda r: r.get_id())

# Initialize information messages and known peers
messages, self.__known_peers = {}, {}
n_r = len(rank_set)
for r_snd in rank_set:
n_r = len(phase_ranks)
for r_snd in phase_ranks:
# Make rank aware of itself
self.__known_peers[r_snd] = {r_snd}

# Create initial message spawned from rank
msg = Message(0, {r_snd})

# Broadcast message to random sample of ranks excluding self
for r_rcv in random.sample(
list(rank_set.difference({r_snd})), min(self.__fanout, n_r - 1)):
other_ranks = [r for r in phase_ranks if r!= r_snd] if self._deterministic_transfer else phase_ranks.difference({r_snd})
for r_rcv in random.sample(other_ranks, min(self.__fanout, n_r - 1)):
messages.setdefault(r_rcv, []).append(msg)

# Sanity check prior to forwarding iterations
Expand All @@ -176,7 +178,7 @@ def __execute_information_stage(self):

# Perform sanity check on first round of information aggregation
n_k = 0
for r in rank_set:
for r in phase_ranks:
# Retrieve and tally peers known to rank
k_p = self.__known_peers.get(r, {})
n_k += len(k_p)
Expand All @@ -193,7 +195,7 @@ def __execute_information_stage(self):
messages.clear()

# Iterate over all ranks
for r_snd in rank_set:
for r_snd in phase_ranks:
# Collect message when destination list is not empty
dst, msg = self.__forward_message(
i, r_snd, self.__fanout)
Expand All @@ -206,7 +208,7 @@ def __execute_information_stage(self):
self.__process_message(r_rcv, m)

# Report on known peers when requested
for rank in rank_set:
for rank in phase_ranks:
self._logger.debug(
f"Peers known to rank {rank.get_id()}: {[r_k.get_id() for r_k in k_p]}")

Expand Down
8 changes: 5 additions & 3 deletions src/lbaf/Execution/lbsRecursiveTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ def execute(self, known_peers, phase: Phase, ave_load: float, _):
max_obj_transfers = 0

# Map rank to targets and ordered migratable objects
ranks = phase.get_ranks()
rank_targets = self._get_ranks_to_traverse(ranks, known_peers)
phase_ranks = phase.get_ranks()
if self._deterministic_transfer:
phase_ranks = sorted(phase_ranks, key=lambda r: r.get_id())
rank_targets = self._get_ranks_to_traverse(phase_ranks, known_peers)

# Iterate over traversable ranks
for r_src, targets in rank_targets.items():
Expand Down Expand Up @@ -172,7 +174,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float, _):
# Return transfer phase counts
self._logger.info(
f"Maximum number of objects transferred at once: {max_obj_transfers}")
return len(ranks) - len(rank_targets), self._n_transfers, self._n_rejects
return len(phase_ranks) - len(rank_targets), self._n_transfers, self._n_rejects

@staticmethod
def arbitrary(objects: set, _):
Expand Down
3 changes: 2 additions & 1 deletion src/lbaf/Execution/lbsTransferStrategyBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import math
import random
from logging import Logger
from typing import Union, Set, List

from ..IO.lbsStatistics import inverse_transform_sample
from ..Execution.lbsCriterionBase import CriterionBase
Expand Down Expand Up @@ -99,7 +100,7 @@ def _initialize_transfer_stage(self, ave_load: float):
self._n_transfers = 0
self._n_rejects = 0

def _get_ranks_to_traverse(self, ranks: list, known_peers: dict) -> dict:
def _get_ranks_to_traverse(self, ranks: Union[List, Set], known_peers: dict) -> dict:
"""Prepare randomized dict of ranks to transfer targets."""

# Initialize dictionary of traversable ranks to targets
Expand Down
5 changes: 3 additions & 2 deletions src/lbaf/IO/lbsStatistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
class Statistics:
"""A class storing descriptive statistics."""

def __init__(

Check notice on line 56 in src/lbaf/IO/lbsStatistics.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many arguments (8/7) (too-many-arguments)
self,
n: int,
mini: float,
Expand Down Expand Up @@ -427,8 +427,9 @@
logger)

# Print more detailed information if requested
for i, v in enumerate(values):
logger.debug(f"\t{i}: {function(v)}")
for v in values:
id = f"{v.get_id()}: " if hasattr(v, "get_id") and callable(getattr(v, "get_id")) else ""

Check warning on line 431 in src/lbaf/IO/lbsStatistics.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Redefining built-in 'id' (redefined-builtin)
logger.debug(f"\t{id}{function(v)}")

# Return descriptive statistics instance
return stats
Expand Down
4 changes: 2 additions & 2 deletions src/lbaf/Model/lbsPhase.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ def __init__(
# Sub-index of this phase e.g. for load balancing iteration
self.__phase_sub_id = p_sub_id

# Initialize empty list of ranks
self.__ranks = []
# Initialize empty set of ranks
self.__ranks: Set[Rank] = set()

# Initialize phase communication dict
self.__communications = {}
Expand Down
4 changes: 2 additions & 2 deletions tests/acceptance/config/synthetic-acceptance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ algorithm:
order_strategy: element_id
transfer_strategy: Recursive
criterion: Tempered
max_objects_per_transfer: 2
max_objects_per_transfer: 1
deterministic_transfer: true

# Specify output
logging_level: info
logging_level: debug
output_dir: ../output
output_file_stem: output_file
2 changes: 1 addition & 1 deletion tests/unit/Model/test_lbs_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def setUp(self):
self.phase = Phase(self.logger, 0, reader=self.reader)

def test_lbs_phase_initialization(self):
self.assertEqual(self.phase._Phase__ranks, [])
self.assertEqual(self.phase._Phase__ranks, set())
self.assertEqual(self.phase._Phase__phase_id, 0)
self.assertEqual(self.phase._Phase__edges, None)

Expand Down
Loading