Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
285e66a
#558: base of reconstructed PR post conflict resolution and integrati…
ppebay Dec 30, 2024
b92871b
#558: whitespace cleanup
ppebay Dec 30, 2024
b988409
#558: fixed incomplete type
ppebay Dec 30, 2024
9f6f3e3
#558: progress checkpoint where nodes and node level memory values ar…
ppebay Jan 13, 2025
134ab85
#558: whitespace cleanup
ppebay Jan 13, 2025
75b326c
#558: fix CI failures
cwschilly Jan 13, 2025
8d92f0b
#558: add unit test for lbsNode
cwschilly Jan 13, 2025
6a8c2fb
#558: small syntax improvement
cwschilly Jan 13, 2025
84f9dba
#558: add space
cwschilly Jan 13, 2025
2e059f8
#558: parentheses needed in test when walrus operator is used
ppebay Jan 14, 2025
8873353
#558: improved and enriched statistics printouts
ppebay Jan 14, 2025
ab31f64
#558: added node getter and reporting node max memory usage properly …
ppebay Jan 14, 2025
d9cfcc1
#558: better initialization
ppebay Jan 14, 2025
19bd77b
#558: additional and better testing
ppebay Jan 14, 2025
702bdd5
#558: completed implementation fixing remaining shared blocks bug
ppebay Jan 14, 2025
618179f
#558: removed now unused set_shared_blocks from tests
ppebay Jan 15, 2025
cabace4
#558: fix CI issues
ppebay Jan 15, 2025
b587abd
#558: WS cleanup
ppebay Jan 15, 2025
dab55d1
#558: fix unit test failures
cwschilly Jan 15, 2025
1c333e6
#558: fix formatting and delete unnecessary block
cwschilly Jan 15, 2025
5895b20
#558: use copy for rank objects instead of deepcopy; other small fixes
cwschilly Jan 16, 2025
906114f
#558: create new ranks set directly as member variable
cwschilly Jan 16, 2025
f729b61
#558: use configuration 1 for load-only test case
cwschilly Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions config/synthetic-blocks.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Specify input
from_data:
data_stem: ../data/synthetic-blocks/synthetic-dataset-blocks
ranks_per_node: 2
phase_ids:
- 0
check_schema: false
Expand All @@ -12,6 +13,8 @@ work_model:
alpha: 1.0
beta: 0.0
gamma: 0.0
upper_bounds:
max_memory_usage: 45.0

# Specify algorithm
brute_force_optimization: true
Expand All @@ -31,21 +34,20 @@ algorithm:
# Specify output
output_dir: ../output
output_file_stem: synthetic-dataset-blocks
write_JSON:
compressed: false
suffix: json
communications: true
offline_lb_compatible: true
lb_iterations: true
visualization:
x_ranks: 2
y_ranks: 2
z_ranks: 1
object_jitter: 0.5
rank_qoi: load
object_qoi: load
object_qoi: shared_id
save_meshes: true
force_continuous_object_qoi: true
output_visualization_dir: ../output
output_visualization_file_stem: output_file

write_JSON:
compressed: false
suffix: json
communications: true
offline_lb_compatible: true
lb_iterations: true
13 changes: 12 additions & 1 deletion config/user-defined-memory-toy-problem.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,20 @@ algorithm:

# Specify output
output_dir: ../output
output_file_stem: output_file
output_file_stem: user-defined-memory-toy-problem
write_JSON:
compressed: true
suffix: json
communications: true
offline_lb_compatible: true
visualization:
x_ranks: 2
y_ranks: 2
z_ranks: 1
object_jitter: 0.5
rank_qoi: load
object_qoi: shared_id
save_meshes: true
force_continuous_object_qoi: true
output_visualization_dir: ../output
output_visualization_file_stem: output_file
39 changes: 27 additions & 12 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class InternalParameters:

# From data input options
data_stem: Optional[str] = None
ranks_per_node: Optional[int] = 1

# From samplers input options
n_ranks: Optional[int] = None
Expand Down Expand Up @@ -150,6 +151,8 @@ def init_parameters(self, config: dict, base_dir: str):
else:
self.phase_ids = from_data.get("phase_ids")
self.expected_ranks = from_data.get("expected_ranks")
if (rpn := from_data.get("ranks_per_node")) is not None:
self.ranks_per_node = int(rpn)

# Parse sampling parameters if present
from_samplers = config.get("from_samplers")
Expand Down Expand Up @@ -291,7 +294,7 @@ def __configure(self, *config_path):
:returns: The configuration as a dictionary
"""

# merge configurations
# Merge configurations
config = self.__merge_configurations(*config_path)

# Change logger (with parameters from the configuration)
Expand Down Expand Up @@ -322,6 +325,7 @@ def __configure(self, *config_path):
self.__parameters.output_file_stem,
self.__parameters.json_params) if self.__parameters.json_params else None

# Return configuration
return config

def __resolve_config_path(self, config_path) -> str:
Expand All @@ -333,10 +337,8 @@ def __resolve_config_path(self, config_path) -> str:
"""
# Search config file in the current working directory if relative
path = config_path
path_list = []
path_list.append(path)
if (
path is not None and
path_list = [path]
if (path is not None and
not os.path.isfile(path) and
not os.path.isabs(config_path) and PROJECT_PATH is not None
):
Expand Down Expand Up @@ -372,33 +374,38 @@ def __print_statistics(self, phase: Phase, phase_name: str, work_model: WorkMode
lambda x: x.get_size(),
f"{phase_name} rank working memory",
self.__logger)
shared_memory_stats = lbstats.print_function_statistics(
r_shared_mem_stats = lbstats.print_function_statistics(
phase.get_ranks(),
lambda x: x.get_shared_memory(),
f"{phase_name} rank shared memory",
self.__logger)
lbstats.print_function_statistics(
phase.get_ranks(),
lambda x: x.get_max_memory_usage(),
f"{phase_name} maximum memory usage",
f"{phase_name} rank maximum memory usage",
self.__logger)
if shared_memory_stats.get_maximum():
lbstats.print_function_statistics(
phase.get_nodes(),
lambda x: x.get_max_memory_usage(),
f"{phase_name} node maximum memory usage",
self.__logger)
if r_shared_mem_stats.get_maximum():
lbstats.print_function_statistics(
phase.get_ranks(),
lambda x: x.get_homed_blocks_ratio(),
f"{phase_name} homed_blocks_ratio",
f"{phase_name} homed blocks ratio",
self.__logger)
lbstats.print_function_statistics(
phase.get_ranks(),
lambda x: x.get_number_of_uprooted_blocks(),
f"{phase_name} number_of_uprooted_blocks",
f"{phase_name} number of uprooted blocks",
self.__logger)

# Print edge statistics
lbstats.print_function_statistics(
phase.get_edge_maxima().values(),
lambda x: x,
f"{phase_name} sent volumes",
f"{phase_name} inter-rank sent volumes",
self.__logger)

if work_model is not None:
Expand Down Expand Up @@ -515,7 +522,8 @@ def run(self, cfg=None, cfg_dir=None):
logger=self.__logger,
file_suffix=file_suffix if file_suffix is not None else "json",
check_schema=check_schema,
expected_ranks=self.__parameters.expected_ranks)
expected_ranks=self.__parameters.expected_ranks,
ranks_per_node=self.__parameters.ranks_per_node)

# Retrieve n_ranks
n_ranks = reader.n_ranks
Expand Down Expand Up @@ -558,6 +566,9 @@ def run(self, cfg=None, cfg_dir=None):
a_min_max = []

# Instantiate runtime
if self.__parameters.ranks_per_node > 1 and (
wmp := self.__parameters.work_model.get("parameters")):
wmp["node_bounds"] = True
runtime = Runtime(
phases,
self.__parameters.work_model,
Expand Down Expand Up @@ -645,19 +656,23 @@ def run(self, cfg=None, cfg_dir=None):
self.__parameters.grid_size[1] *
self.__parameters.grid_size[2] )

# Execute vt-tv
vttv.tvFromJson(ranks_json_str, str(vttv_params), num_ranks)

# Report on rebalanced phase when available
if rebalanced_phase:
l_stats, w_stats = self.__print_statistics(
rebalanced_phase, "rebalanced", runtime.get_work_model())

# Save final load imbalance to file
with open(
"imbalance.txt" if self.__parameters.output_dir is None
else os.path.join(
self.__parameters.output_dir,
"imbalance.txt"), 'w', encoding="utf-8") as imbalance_file:
imbalance_file.write(f"{l_stats.get_imbalance()}")

# Save final maximum work to file
with open(
"w_max.txt" if self.__parameters.output_dir is None
else os.path.join(
Expand Down
1 change: 1 addition & 0 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from typing import List

from ..IO.lbsStatistics import compute_function_statistics
from ..Model.lbsNode import Node
from ..Model.lbsPhase import Phase
from ..Model.lbsWorkModelBase import WorkModelBase
from ..Utils.lbsLogging import Logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def _try_bin(self, ranks, max_rank, tbin, size, sid, objs):

selected_load = o.get_load()

# If our situation is not made worse and fits under memory constraints, do the transer
# If our situation is not made worse and fits under memory constraints, do the transfer
if (sid in min_rank.get_shared_ids() or \
len(min_rank.get_shared_ids()) < self._max_shared_ids) and \
min_rank.get_load() + selected_load < max_rank.get_load():
Expand Down
4 changes: 4 additions & 0 deletions src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ def __init__(self, config_to_validate: dict, logger: Logger):
error="Should be of type 'list' of 'int' types"),
Regex(r"^[0-9]+-[0-9]+$", error="Should be of type 'str' like '0-100'")),
Optional("expected_ranks"): And(
int,
lambda x: x > 0,
error="Should be of type 'int' and > 0"),
Optional("ranks_per_node"): And(
int,
lambda x: x > 0,
error="Should be of type 'int' and > 0")
Expand Down
67 changes: 47 additions & 20 deletions src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@
from ..Model.lbsObject import Object
from ..Model.lbsObjectCommunicator import ObjectCommunicator
from ..Model.lbsRank import Rank
from ..Model.lbsNode import Node

class LoadReader:

Check notice on line 59 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many instance attributes (11/7) (too-many-instance-attributes)
"""A class to read VT Object Map files. These json files could be compressed with Brotli.

Each file is named as <base-name>.<node>.json, where <node> spans the number of MPI ranks that VT is utilizing.
Expand All @@ -75,7 +76,13 @@
}

def __init__(
self, file_prefix: str, logger: Logger, file_suffix: str = "json", check_schema=True, expected_ranks=None):
self,
file_prefix: str,
logger: Logger,
file_suffix: str="json",
check_schema: bool=True,
expected_ranks=None,
ranks_per_node: int=1):
# The base directory and file name for the log files
self.__file_prefix = file_prefix

Expand All @@ -98,27 +105,36 @@
manager = Manager()
self.__metadata = manager.dict()

# imported JSON_data_files_validator module (lazy import)
# Lazy import of JSON_data_files_validator module
if LoadReader.SCHEMA_VALIDATOR_CLASS is None:
# pylint:disable=import-outside-toplevel
from ..imported.JSON_data_files_validator import \
SchemaValidator as \
sv
LoadReader.SCHEMA_VALIDATOR_CLASS = sv

# determine the number of ranks
# Determine the number of ranks
self.n_ranks = self._get_n_ranks()
self.__logger.info(f"Number of ranks: {self.n_ranks}")

# warn user if expected_ranks is set and is different from n_ranks
# Intialize nodes
self.__nodes = []
self.ranks_per_node = ranks_per_node
self.__logger.info(f"Ranks per node: {ranks_per_node}")
if self.ranks_per_node > 1:
n_nodes = int(self.n_ranks // self.ranks_per_node)
self.__nodes = [
Node(self.__logger, i) for i in range(0, n_nodes)]

# Warn user if expected_ranks is set and is different from n_ranks
if self.expected_ranks is not None and self.expected_ranks != self.n_ranks:
self.__logger.warn(f"Unexpected number of ranks ({self.expected_ranks} was expected)")

# init vt data
# Initialize vt data
self.__vt_data = {}

# Load vt data concurrently with one file per rank
if self.n_ranks > 0:
# Load vt data concurrently from rank 1 to n_ranks
with Pool(context=get_context("fork")) as pool:
results = pool.imap_unordered(
self._load_vt_file, range(0, self.n_ranks))
Expand All @@ -133,22 +149,24 @@
raise SystemExit(1)

def _get_n_ranks(self):
"""Determine the number of ranks automatically.
This uses the first applicable method in the following methods:
List all data file names matching {file_prefix}.{rank_id}.{file_suffix}
pattern and return max(rank_id) + 1.
"""
"""Determine the number of ranks automatically."""

# or default detect data files with pattern
# Assemble data directory name
data_dir = f"{os.sep}".join(self.__file_prefix.split(os.sep)[:-1])
pattern = re.compile(rf"^{self.__file_prefix}.(\d+).{self.__file_suffix}$")

# Default highest rank ID to zero
highest_rank = 0

# Look for pattern matching {file_prefix}.{rank_id}.{file_suffix}
pattern = re.compile(rf"^{self.__file_prefix}.(\d+).{self.__file_suffix}$")
for name in os.listdir(data_dir):
path = os.path.join(data_dir, name)
match_result = pattern.search(path)
if match_result:
rank_id = int(match_result.group(1))
highest_rank = max(highest_rank, rank_id)

# Return number of ranks
return highest_rank + 1

def _get_rank_file_name(self, rank_id: int):
Expand Down Expand Up @@ -199,7 +217,7 @@
# Return rank ID and data dictionary
return rank_id, decompressed_dict

def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]:

Check notice on line 220 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many local variables (38/15) (too-many-locals)

Check notice on line 220 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many branches (19/12) (too-many-branches)

Check notice on line 220 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many statements (66/50) (too-many-statements)
""" Populate rank and its communicator in phase using the JSON content."""

# Seek phase with given ID
Expand All @@ -226,7 +244,7 @@

# Add communications when available
rank_comm = {}
if (communications := phase.get("communications")):

Check warning on line 247 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Using possibly undefined loop variable 'phase' (undefined-loop-variable)
if phase_id in self.__communications_dict:
self.__communications_dict[phase_id][rank_id] = communications
else:
Expand Down Expand Up @@ -266,15 +284,23 @@
# No communications for this phase
self.__communications_dict.setdefault(phase_id, {rank_id: {}})

# Instantiante rank for current phase

# Create phase rank
phase_rank = Rank(self.__logger, rank_id)
phase_rank.set_metadata(self.__metadata[rank_id])

# Create node when required
rank_node = None
if self.ranks_per_node > 1:
rank_node = self.__nodes[rank_id // self.ranks_per_node]
rank_node.add_rank(phase_rank)
phase_rank.set_node(rank_node)

# Initialize storage for shared blocks information
rank_blocks, task_user_defined = {}, {}

# Iterate over tasks
for task in phase.get("tasks", []):

Check warning on line 303 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Using possibly undefined loop variable 'phase' (undefined-loop-variable)
# Retrieve required values
task_entity = task.get("entity")
task_id = task_entity.get("id")
Expand Down Expand Up @@ -324,32 +350,33 @@
# Set rank-level memory quantities of interest
phase_rank.set_size(
task_user_defined.get("rank_working_bytes", 0.0))
shared_blocks = set()
for b_id, (b_size, objects) in rank_blocks.items():
# Create and add new block
shared_blocks.add(block := Block(
# Create new block
block = Block(
b_id, h_id=rank_id, size=b_size,
o_ids={o.get_id() for o in objects}))
o_ids={o.get_id() for o in objects})

# Assign block to objects attached to it
for o in objects:
o.set_shared_block(block)
phase_rank.set_shared_blocks(shared_blocks)

# Returned rank and communicators per phase
return phase_rank, rank_comm

def populate_phase(self, phase_id: int) -> List[Rank]:

Check notice on line 366 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many local variables (18/15) (too-many-locals)
""" Populate phase using the JSON content."""

# Create storage for ranks
ranks: List[Rank] = [None] * self.n_ranks

# Create storage for communications
communications = {}

# Iterate over all ranks
for rank_id in range(self.n_ranks):
# Read data for given phase and assign it to rank
ranks[rank_id], rank_comm = self._populate_rank(phase_id, rank_id)
ranks[rank_id], rank_comm = self._populate_rank(
phase_id, rank_id)

# Merge rank communication with existing ones
for k, v in rank_comm.items():
Expand Down
Loading
Loading