Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def run(self, cfg=None, cfg_dir=None):
# Insert rebalanced phase into dictionary of phases
phases[p_id] = rebalanced_phase

# Write all phasesOA
# Write all phases
self.__logger.info(
f"Writing all ({len(phases)}) phases for offline load-balancing")
self.__json_writer.write(phases)
Expand Down
38 changes: 17 additions & 21 deletions src/lbaf/IO/lbsVTDataWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import os
import sys
import math

from logging import Logger
from typing import Optional

Expand All @@ -54,6 +55,7 @@
from ..Model.lbsRank import Rank
from ..Model.lbsObject import Object

from ..Utils.lbsTimerDecorator import timer

class VTDataWriter:
"""A class to write load directives for VT as JSON files
Expand Down Expand Up @@ -94,11 +96,13 @@ def __init__(
try:
self.__extension = parameters["json_output_suffix"]
self.__compress = parameters["compressed"]
self.__add_communications = parameters.get("communications", True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to default to True here? Or should users opt-in to writing comms?

except Exception as e:
self.__logger.error(
f"Missing JSON writer configuration parameter(s): {e}")
raise SystemExit(1) from e

@timer
def __create_tasks(self, rank_id, objects, migratable):
"""Create per-object entries to be outputted to JSON."""
tasks = []
Expand Down Expand Up @@ -145,6 +149,7 @@ def __create_tasks(self, rank_id, objects, migratable):
# Return created tasks on this rank
return tasks

@timer
def __create_task_data(self, rank: Rank):
"""Create task data."""
return sorted(
Expand All @@ -155,20 +160,13 @@ def __create_task_data(self, rank: Rank):
key=lambda x: x.get("entity").get(
"id", x.get("entity").get("seq_id")))

def __find_object_rank(self, phase: Phase, obj: Object):
"""Determine which rank owns the object."""
for r in phase.get_ranks():
if obj in r.get_objects():
return r

# If this point is reached the object could not be found
self.__logger.error(
f"Object id {object} cannot be located in any rank of phase {phase.get_id()}")
raise SystemExit(1)

@timer
def __get_communications(self, phase: Phase, rank: Rank):
"""Create communication entries to be outputted to JSON."""

if not self.__add_communications:
return None

# Get initial communications (if any) for current phase
phase_communications_dict = phase.get_communications()

Expand All @@ -194,7 +192,6 @@ def __get_communications(self, phase: Phase, rank: Rank):
# Retrieve communications with single sender
sender_obj = sender_obj[0]
sender_rank_id = sender_obj.get_rank_id()
#sender_rank_id = self.__find_object_rank(phase, sender_obj).get_id()
from_rank: Rank = [
r for r in phase.get_ranks() if r.get_id() == sender_rank_id][0]
comm_entry["from"]["home"] = sender_rank_id
Expand All @@ -215,7 +212,6 @@ def __get_communications(self, phase: Phase, rank: Rank):
# Retrieve communications with single receiver
receiver_obj = receiver_obj[0]
receiver_rank_id = receiver_obj.get_rank_id()
#receiver_rank_id = self.__find_object_rank(phase, receiver_obj).get_id()
comm_entry["to"]["home"] = receiver_rank_id
to_rank: Rank = [
r for r in phase.get_ranks() if receiver_obj in r.get_objects()][0]
Expand Down Expand Up @@ -254,6 +250,7 @@ def __get_communications(self, phase: Phase, rank: Rank):
# Return created list of communications
return communications

@timer
def _json_serializer(self, rank_phases_double) -> str:
"""Write one JSON per rank for list of phase instances."""
# Unpack received double
Expand Down Expand Up @@ -357,6 +354,7 @@ def _json_serializer(self, rank_phases_double) -> str:
serial_json = json.dumps(output, separators=(',', ':'))
return serial_json

@timer
def _json_writer(self, rank_phases_double) -> str:
"""Write one JSON per rank for list of phase instances."""
# Unpack received double
Expand All @@ -378,9 +376,9 @@ def _json_writer(self, rank_phases_double) -> str:
# Return JSON file name
return file_name

@timer
def write(self, phases: dict):
""" Write one JSON per rank for dictonary of phases with possibly iterations."""

# Ensure that provided phase has correct type
if not isinstance(phases, dict):
self.__logger.error(
Expand All @@ -391,17 +389,15 @@ def write(self, phases: dict):
# Assemble mapping from ranks to their phases
self.__rank_phases = {}
for phase in self.__phases.values():
# Handle case where entry only cintains a phase
# Handle case where entry only contains a phase
for r in phase.get_ranks():
self.__rank_phases.setdefault(r.get_id(), {})
self.__rank_phases[r.get_id()][phase.get_id()] = r

# Prevent recursion overruns
sys.setrecursionlimit(25000)

# Write individual rank files using data parallelism
with mp.pool.Pool(context=mp.get_context("fork")) as pool:
results = pool.imap_unordered(
self._json_writer, self.__rank_phases.items())
for file_name in results:
self.__logger.info(f"Wrote {file_name}")
# Write individual rank files
for rank_phases_double in self.__rank_phases.items():
file_name = self._json_writer(rank_phases_double)
self.__logger.info(f"Wrote {file_name}")
20 changes: 20 additions & 0 deletions src/lbaf/Utils/lbsTimerDecorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import time
import logging
import functools

def timer(method):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
"""Times the execution of the decorated function."""
# Time the function duration
start = time.time()
res = method(self, *args, **kwargs)
end = time.time()
dur = end - start

# Get a logger instance (stacklevel=2 uses the logger from the function being decorated)
logger = logging.getLogger(self.__class__.__module__)
logger.debug(f"{method.__name__}: {dur:.4f} s", stacklevel=2)

return res
return wrapper
Loading