Skip to content

Commit 966684c

Browse files
committed
#617: tmp fix: do not use mp for writing JSON
1 parent f04473f commit 966684c

File tree

2 files changed

+46
-15
lines changed

2 files changed

+46
-15
lines changed

src/lbaf/Applications/LBAF_app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ def run(self, cfg=None, cfg_dir=None):
597597
# Insert rebalanced phase into dictionary of phases
598598
phases[p_id] = rebalanced_phase
599599

600-
# Write all phasesOA
600+
# Write all phases
601601
self.__logger.info(
602602
f"Writing all ({len(phases)}) phases for offline load-balancing")
603603
self.__json_writer.write(phases)

src/lbaf/IO/lbsVTDataWriter.py

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ def __find_object_rank(self, phase: Phase, obj: Object):
187187
def __get_communications(self, phase: Phase, rank: Rank):
188188
"""Create communication entries to be outputted to JSON."""
189189

190+
if not self.__add_communications:
191+
return None
192+
190193
# Get initial communications (if any) for current phase
191194
phase_communications_dict = phase.get_communications()
192195

@@ -324,10 +327,9 @@ def _json_serializer(self, rank_phases_double) -> str:
324327
phase_data["user_defined"]["num_homed_ratio"] = homed_ratio
325328

326329
# Add communication data if present
327-
if self.__add_communications:
328-
communications = self.__get_communications(current_phase, rank)
329-
if communications:
330-
phase_data["communications"] = communications
330+
communications = self.__get_communications(current_phase, rank)
331+
if communications:
332+
phase_data["communications"] = communications
331333

332334
# Add load balancing iterations if present
333335
lb_iterations = current_phase.get_lb_iterations()
@@ -363,10 +365,9 @@ def _json_serializer(self, rank_phases_double) -> str:
363365
iteration_data["user_defined"]["num_homed_ratio"] = homed_ratio
364366

365367
# Add communication data if present
366-
if self.__add_communications:
367-
communications = self.__get_communications(it, it_r)
368-
if communications:
369-
iteration_data["communications"] = communications
368+
communications = self.__get_communications(it, it_r)
369+
if communications:
370+
iteration_data["communications"] = communications
370371

371372
# Append load balancing iteration to phase data
372373
phase_data["lb_iterations"].append(iteration_data)
@@ -404,27 +405,57 @@ def _json_writer(self, rank_phases_double) -> str:
404405
def write(self, phases: dict):
405406
""" Write one JSON per rank for dictonary of phases with possibly iterations."""
406407

408+
## REGION A
409+
410+
# start = time.time()
411+
407412
# Ensure that provided phase has correct type
408413
if not isinstance(phases, dict):
409414
self.__logger.error(
410415
"JSON writer must be passed a dictionary")
411416
raise SystemExit(1)
412417
self.__phases = phases
413418

419+
# end = time.time()
420+
# dur = end - start
421+
# print(f"Region A: {dur} s")
422+
423+
## REGION B
424+
425+
# start = time.time()
426+
414427
# Assemble mapping from ranks to their phases
415428
self.__rank_phases = {}
416429
for phase in self.__phases.values():
417-
# Handle case where entry only cintains a phase
430+
# Handle case where entry only contains a phase
418431
for r in phase.get_ranks():
419432
self.__rank_phases.setdefault(r.get_id(), {})
420433
self.__rank_phases[r.get_id()][phase.get_id()] = r
421434

422435
# Prevent recursion overruns
423436
sys.setrecursionlimit(25000)
424437

438+
# end = time.time()
439+
# dur = end - start
440+
# print(f"Region B: {dur} s")
441+
442+
## REGION C
443+
444+
# start = time.time()
445+
425446
# Write individual rank files using data parallelism
426-
with mp.pool.Pool(context=mp.get_context("fork")) as pool:
427-
results = pool.imap_unordered(
428-
self._json_writer, self.__rank_phases.items())
429-
for file_name in results:
430-
self.__logger.info(f"Wrote {file_name}")
447+
# with mp.pool.Pool(context=mp.get_context("spawn")) as pool:
448+
# self.__logger.info(f"{pool._processes} threads for {len(self.__rank_phases)} ranks.")
449+
# results = pool.imap_unordered(
450+
# self._json_writer, self.__rank_phases.items())
451+
# for file_name in results:
452+
# self.__logger.info(f"Wrote {file_name}")
453+
454+
# Try in serial
455+
for rank_phases_double in self.__rank_phases.items():
456+
file_name = self._json_writer(rank_phases_double)
457+
self.__logger.info(f"Wrote {file_name}")
458+
459+
# end = time.time()
460+
# dur = end - start
461+
# print(f"Region C: {dur} s")

0 commit comments

Comments
 (0)