Skip to content

Commit 2050931

Browse files
ppebaycwschilly
andauthored
#558: Restructured PR to add node level memory constraint post recent changes (#581)
* #558: base of reconstructed PR post conflict resolution and integration of changes * #558: whitespace cleanup * #558: fixed incomplete type * #558: progress checkpoint where nodes and node level memory values are correct * #558: whitespace cleanup * #558: fix CI failures * #558: add unit test for lbsNode * #558: small syntax improvement * #558: add space * #558: parentheses needed in test when walrus operator is used * #558: improved and enriched statistics printouts * #558: added node getter and reporting node max memory usage properly too now * #558: better initialization * #558: additional and better testing * #558: completed implementation fixing remaining shared blocks bug * #558: removed now unused set_shared_blocks from tests * #558: fix CI issues * #558: WS cleanup * #558: fix unit test failures * #558: fix formatting and delete unnecessary block * #558: use copy for rank objects instead of deepcopy; other small fixes * #558: create new ranks set directly as member variable * #558: use configuration 1 for load-only test case --------- Co-authored-by: Caleb Schilly <[email protected]>
1 parent b656fac commit 2050931

20 files changed

+379
-138
lines changed

config/synthetic-blocks.yaml

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Specify input
22
from_data:
33
data_stem: ../data/synthetic-blocks/synthetic-dataset-blocks
4+
ranks_per_node: 2
45
phase_ids:
56
- 0
67
check_schema: false
@@ -12,6 +13,8 @@ work_model:
1213
alpha: 1.0
1314
beta: 0.0
1415
gamma: 0.0
16+
upper_bounds:
17+
max_memory_usage: 45.0
1518

1619
# Specify algorithm
1720
brute_force_optimization: true
@@ -32,21 +35,20 @@ algorithm:
3235
# Specify output
3336
output_dir: ../output
3437
output_file_stem: synthetic-dataset-blocks
38+
write_JSON:
39+
compressed: false
40+
suffix: json
41+
communications: true
42+
offline_lb_compatible: true
43+
lb_iterations: true
3544
visualization:
3645
x_ranks: 2
3746
y_ranks: 2
3847
z_ranks: 1
3948
object_jitter: 0.5
4049
rank_qoi: load
41-
object_qoi: load
50+
object_qoi: shared_id
4251
save_meshes: true
4352
force_continuous_object_qoi: true
4453
output_visualization_dir: ../output
4554
output_visualization_file_stem: output_file
46-
47-
write_JSON:
48-
compressed: false
49-
suffix: json
50-
communications: true
51-
offline_lb_compatible: true
52-
lb_iterations: true

config/user-defined-memory-toy-problem.yaml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,20 @@ algorithm:
3131

3232
# Specify output
3333
output_dir: ../output
34-
output_file_stem: output_file
34+
output_file_stem: user-defined-memory-toy-problem
3535
write_JSON:
3636
compressed: true
3737
suffix: json
3838
communications: true
3939
offline_lb_compatible: true
40+
visualization:
41+
x_ranks: 2
42+
y_ranks: 2
43+
z_ranks: 1
44+
object_jitter: 0.5
45+
rank_qoi: load
46+
object_qoi: shared_id
47+
save_meshes: true
48+
force_continuous_object_qoi: true
49+
output_visualization_dir: ../output
50+
output_visualization_file_stem: output_file

src/lbaf/Applications/LBAF_app.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class InternalParameters:
8888

8989
# From data input options
9090
data_stem: Optional[str] = None
91+
ranks_per_node: Optional[int] = 1
9192

9293
# From samplers input options
9394
n_ranks: Optional[int] = None
@@ -150,6 +151,8 @@ def init_parameters(self, config: dict, base_dir: str):
150151
else:
151152
self.phase_ids = from_data.get("phase_ids")
152153
self.expected_ranks = from_data.get("expected_ranks")
154+
if (rpn := from_data.get("ranks_per_node")) is not None:
155+
self.ranks_per_node = int(rpn)
153156

154157
# Parse sampling parameters if present
155158
from_samplers = config.get("from_samplers")
@@ -291,7 +294,7 @@ def __configure(self, *config_path):
291294
:returns: The configuration as a dictionary
292295
"""
293296

294-
# merge configurations
297+
# Merge configurations
295298
config = self.__merge_configurations(*config_path)
296299

297300
# Change logger (with parameters from the configuration)
@@ -322,6 +325,7 @@ def __configure(self, *config_path):
322325
self.__parameters.output_file_stem,
323326
self.__parameters.json_params) if self.__parameters.json_params else None
324327

328+
# Return configuration
325329
return config
326330

327331
def __resolve_config_path(self, config_path) -> str:
@@ -333,10 +337,8 @@ def __resolve_config_path(self, config_path) -> str:
333337
"""
334338
# Search config file in the current working directory if relative
335339
path = config_path
336-
path_list = []
337-
path_list.append(path)
338-
if (
339-
path is not None and
340+
path_list = [path]
341+
if (path is not None and
340342
not os.path.isfile(path) and
341343
not os.path.isabs(config_path) and PROJECT_PATH is not None
342344
):
@@ -372,33 +374,38 @@ def __print_statistics(self, phase: Phase, phase_name: str, work_model: WorkMode
372374
lambda x: x.get_size(),
373375
f"{phase_name} rank working memory",
374376
self.__logger)
375-
shared_memory_stats = lbstats.print_function_statistics(
377+
r_shared_mem_stats = lbstats.print_function_statistics(
376378
phase.get_ranks(),
377379
lambda x: x.get_shared_memory(),
378380
f"{phase_name} rank shared memory",
379381
self.__logger)
380382
lbstats.print_function_statistics(
381383
phase.get_ranks(),
382384
lambda x: x.get_max_memory_usage(),
383-
f"{phase_name} maximum memory usage",
385+
f"{phase_name} rank maximum memory usage",
384386
self.__logger)
385-
if shared_memory_stats.get_maximum():
387+
lbstats.print_function_statistics(
388+
phase.get_nodes(),
389+
lambda x: x.get_max_memory_usage(),
390+
f"{phase_name} node maximum memory usage",
391+
self.__logger)
392+
if r_shared_mem_stats.get_maximum():
386393
lbstats.print_function_statistics(
387394
phase.get_ranks(),
388395
lambda x: x.get_homed_blocks_ratio(),
389-
f"{phase_name} homed_blocks_ratio",
396+
f"{phase_name} homed blocks ratio",
390397
self.__logger)
391398
lbstats.print_function_statistics(
392399
phase.get_ranks(),
393400
lambda x: x.get_number_of_uprooted_blocks(),
394-
f"{phase_name} number_of_uprooted_blocks",
401+
f"{phase_name} number of uprooted blocks",
395402
self.__logger)
396403

397404
# Print edge statistics
398405
lbstats.print_function_statistics(
399406
phase.get_edge_maxima().values(),
400407
lambda x: x,
401-
f"{phase_name} sent volumes",
408+
f"{phase_name} inter-rank sent volumes",
402409
self.__logger)
403410

404411
if work_model is not None:
@@ -515,7 +522,8 @@ def run(self, cfg=None, cfg_dir=None):
515522
logger=self.__logger,
516523
file_suffix=file_suffix if file_suffix is not None else "json",
517524
check_schema=check_schema,
518-
expected_ranks=self.__parameters.expected_ranks)
525+
expected_ranks=self.__parameters.expected_ranks,
526+
ranks_per_node=self.__parameters.ranks_per_node)
519527

520528
# Retrieve n_ranks
521529
n_ranks = reader.n_ranks
@@ -558,6 +566,9 @@ def run(self, cfg=None, cfg_dir=None):
558566
a_min_max = []
559567

560568
# Instantiate runtime
569+
if self.__parameters.ranks_per_node > 1 and (
570+
wmp := self.__parameters.work_model.get("parameters")):
571+
wmp["node_bounds"] = True
561572
runtime = Runtime(
562573
phases,
563574
self.__parameters.work_model,
@@ -645,19 +656,23 @@ def run(self, cfg=None, cfg_dir=None):
645656
self.__parameters.grid_size[1] *
646657
self.__parameters.grid_size[2] )
647658

659+
# Execute vt-tv
648660
vttv.tvFromJson(ranks_json_str, str(vttv_params), num_ranks)
649661

650662
# Report on rebalanced phase when available
651663
if rebalanced_phase:
652664
l_stats, w_stats = self.__print_statistics(
653665
rebalanced_phase, "rebalanced", runtime.get_work_model())
666+
667+
# Save final load imbalance to file
654668
with open(
655669
"imbalance.txt" if self.__parameters.output_dir is None
656670
else os.path.join(
657671
self.__parameters.output_dir,
658672
"imbalance.txt"), 'w', encoding="utf-8") as imbalance_file:
659673
imbalance_file.write(f"{l_stats.get_imbalance()}")
660674

675+
# Save final maximum work to file
661676
with open(
662677
"w_max.txt" if self.__parameters.output_dir is None
663678
else os.path.join(

src/lbaf/Execution/lbsAlgorithmBase.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from typing import List
4545

4646
from ..IO.lbsStatistics import compute_function_statistics
47+
from ..Model.lbsNode import Node
4748
from ..Model.lbsPhase import Phase
4849
from ..Model.lbsWorkModelBase import WorkModelBase
4950
from ..Utils.lbsLogging import Logger

src/lbaf/Execution/lbsCentralizedPrefixOptimizerAlgorithm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def _try_bin(self, ranks, max_rank, tbin, size, sid, objs):
228228

229229
selected_load = o.get_load()
230230

231-
# If our situation is not made worse and fits under memory constraints, do the transer
231+
# If our situation is not made worse and fits under memory constraints, do the transfer
232232
if (sid in min_rank.get_shared_ids() or \
233233
len(min_rank.get_shared_ids()) < self._max_shared_ids) and \
234234
min_rank.get_load() + selected_load < max_rank.get_load():

src/lbaf/IO/lbsConfigurationValidator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ def __init__(self, config_to_validate: dict, logger: Logger):
146146
error="Should be of type 'list' of 'int' types"),
147147
Regex(r"^[0-9]+-[0-9]+$", error="Should be of type 'str' like '0-100'")),
148148
Optional("expected_ranks"): And(
149+
int,
150+
lambda x: x > 0,
151+
error="Should be of type 'int' and > 0"),
152+
Optional("ranks_per_node"): And(
149153
int,
150154
lambda x: x > 0,
151155
error="Should be of type 'int' and > 0")

src/lbaf/IO/lbsVTDataReader.py

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from ..Model.lbsObject import Object
5555
from ..Model.lbsObjectCommunicator import ObjectCommunicator
5656
from ..Model.lbsRank import Rank
57+
from ..Model.lbsNode import Node
5758

5859
class LoadReader:
5960
"""A class to read VT Object Map files. These json files could be compressed with Brotli.
@@ -75,7 +76,13 @@ class LoadReader:
7576
}
7677

7778
def __init__(
78-
self, file_prefix: str, logger: Logger, file_suffix: str = "json", check_schema=True, expected_ranks=None):
79+
self,
80+
file_prefix: str,
81+
logger: Logger,
82+
file_suffix: str="json",
83+
check_schema: bool=True,
84+
expected_ranks=None,
85+
ranks_per_node: int=1):
7986
# The base directory and file name for the log files
8087
self.__file_prefix = file_prefix
8188

@@ -98,27 +105,36 @@ def __init__(
98105
manager = Manager()
99106
self.__metadata = manager.dict()
100107

101-
# imported JSON_data_files_validator module (lazy import)
108+
# Lazy import of JSON_data_files_validator module
102109
if LoadReader.SCHEMA_VALIDATOR_CLASS is None:
103110
# pylint:disable=import-outside-toplevel
104111
from ..imported.JSON_data_files_validator import \
105112
SchemaValidator as \
106113
sv
107114
LoadReader.SCHEMA_VALIDATOR_CLASS = sv
108115

109-
# determine the number of ranks
116+
# Determine the number of ranks
110117
self.n_ranks = self._get_n_ranks()
111118
self.__logger.info(f"Number of ranks: {self.n_ranks}")
112119

113-
# warn user if expected_ranks is set and is different from n_ranks
120+
# Intialize nodes
121+
self.__nodes = []
122+
self.ranks_per_node = ranks_per_node
123+
self.__logger.info(f"Ranks per node: {ranks_per_node}")
124+
if self.ranks_per_node > 1:
125+
n_nodes = int(self.n_ranks // self.ranks_per_node)
126+
self.__nodes = [
127+
Node(self.__logger, i) for i in range(0, n_nodes)]
128+
129+
# Warn user if expected_ranks is set and is different from n_ranks
114130
if self.expected_ranks is not None and self.expected_ranks != self.n_ranks:
115131
self.__logger.warn(f"Unexpected number of ranks ({self.expected_ranks} was expected)")
116132

117-
# init vt data
133+
# Initialize vt data
118134
self.__vt_data = {}
119135

136+
# Load vt data concurrently with one file per rank
120137
if self.n_ranks > 0:
121-
# Load vt data concurrently from rank 1 to n_ranks
122138
with Pool(context=get_context("fork")) as pool:
123139
results = pool.imap_unordered(
124140
self._load_vt_file, range(0, self.n_ranks))
@@ -133,22 +149,24 @@ def __init__(
133149
raise SystemExit(1)
134150

135151
def _get_n_ranks(self):
136-
"""Determine the number of ranks automatically.
137-
This uses the first applicable method in the following methods:
138-
List all data file names matching {file_prefix}.{rank_id}.{file_suffix}
139-
pattern and return max(rank_id) + 1.
140-
"""
152+
"""Determine the number of ranks automatically."""
141153

142-
# or default detect data files with pattern
154+
# Assemble data directory name
143155
data_dir = f"{os.sep}".join(self.__file_prefix.split(os.sep)[:-1])
144-
pattern = re.compile(rf"^{self.__file_prefix}.(\d+).{self.__file_suffix}$")
156+
157+
# Default highest rank ID to zero
145158
highest_rank = 0
159+
160+
# Look for pattern matching {file_prefix}.{rank_id}.{file_suffix}
161+
pattern = re.compile(rf"^{self.__file_prefix}.(\d+).{self.__file_suffix}$")
146162
for name in os.listdir(data_dir):
147163
path = os.path.join(data_dir, name)
148164
match_result = pattern.search(path)
149165
if match_result:
150166
rank_id = int(match_result.group(1))
151167
highest_rank = max(highest_rank, rank_id)
168+
169+
# Return number of ranks
152170
return highest_rank + 1
153171

154172
def _get_rank_file_name(self, rank_id: int):
@@ -266,10 +284,18 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]:
266284
# No communications for this phase
267285
self.__communications_dict.setdefault(phase_id, {rank_id: {}})
268286

269-
# Instantiante rank for current phase
287+
288+
# Create phase rank
270289
phase_rank = Rank(self.__logger, rank_id)
271290
phase_rank.set_metadata(self.__metadata[rank_id])
272291

292+
# Create node when required
293+
rank_node = None
294+
if self.ranks_per_node > 1:
295+
rank_node = self.__nodes[rank_id // self.ranks_per_node]
296+
rank_node.add_rank(phase_rank)
297+
phase_rank.set_node(rank_node)
298+
273299
# Initialize storage for shared blocks information
274300
rank_blocks, task_user_defined = {}, {}
275301

@@ -324,17 +350,15 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]:
324350
# Set rank-level memory quantities of interest
325351
phase_rank.set_size(
326352
task_user_defined.get("rank_working_bytes", 0.0))
327-
shared_blocks = set()
328353
for b_id, (b_size, objects) in rank_blocks.items():
329-
# Create and add new block
330-
shared_blocks.add(block := Block(
354+
# Create new block
355+
block = Block(
331356
b_id, h_id=rank_id, size=b_size,
332-
o_ids={o.get_id() for o in objects}))
357+
o_ids={o.get_id() for o in objects})
333358

334359
# Assign block to objects attached to it
335360
for o in objects:
336361
o.set_shared_block(block)
337-
phase_rank.set_shared_blocks(shared_blocks)
338362

339363
# Returned rank and communicators per phase
340364
return phase_rank, rank_comm
@@ -344,12 +368,15 @@ def populate_phase(self, phase_id: int) -> List[Rank]:
344368

345369
# Create storage for ranks
346370
ranks: List[Rank] = [None] * self.n_ranks
371+
372+
# Create storage for communications
347373
communications = {}
348374

349375
# Iterate over all ranks
350376
for rank_id in range(self.n_ranks):
351377
# Read data for given phase and assign it to rank
352-
ranks[rank_id], rank_comm = self._populate_rank(phase_id, rank_id)
378+
ranks[rank_id], rank_comm = self._populate_rank(
379+
phase_id, rank_id)
353380

354381
# Merge rank communication with existing ones
355382
for k, v in rank_comm.items():

0 commit comments

Comments
 (0)