Skip to content

Commit 4bd7858

Browse files
committed
#617: experiment with thread concurrency
1 parent b198cd6 commit 4bd7858

File tree

2 files changed

+58
-4
lines changed

2 files changed

+58
-4
lines changed

src/lbaf/IO/lbsVTDataWriter.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,43 @@ def _json_writer(self, rank_phases_double) -> str:
377377
return file_name
378378

379379
@timer
380+
def __write_with_concurrency(self):
381+
from concurrent.futures import ProcessPoolExecutor, as_completed
382+
with ProcessPoolExecutor() as executor:
383+
futures = {executor.submit(self._json_writer, item): item for item in self.__rank_phases.items()}
384+
for future in as_completed(futures):
385+
try:
386+
file_name = future.result()
387+
self.__logger.info(f"Wrote {file_name}")
388+
except Exception as e:
389+
self.__logger.error(f"Error processing {futures[future]}: {e}")
390+
391+
@timer
392+
def __write_with_standard_mp(self):
393+
with mp.pool.Pool(context=mp.get_context("fork")) as pool:
394+
results = pool.imap_unordered(
395+
self._json_writer, self.__rank_phases.items())
396+
for file_name in results:
397+
self.__logger.info(f"Wrote {file_name}")
398+
399+
@timer
400+
def __write_with_thread_concurrency(self):
401+
from concurrent.futures import ThreadPoolExecutor, as_completed
402+
with ThreadPoolExecutor() as executor:
403+
futures = {executor.submit(self._json_writer, item): item for item in self.__rank_phases.items()}
404+
for future in as_completed(futures):
405+
try:
406+
file_name = future.result()
407+
self.__logger.info(f"Wrote {file_name}")
408+
except Exception as e:
409+
self.__logger.error(f"Error processing {futures[future]}: {e}")
410+
411+
@timer
412+
def __write_in_serial(self):
413+
for rank_phases_double in self.__rank_phases.items():
414+
file_name = self._json_writer(rank_phases_double)
415+
self.__logger.info(f"Wrote {file_name}")
416+
@timer
380417
def write(self, phases: dict):
381418
""" Write one JSON per rank for dictonary of phases with possibly iterations."""
382419
# Ensure that provided phase has correct type
@@ -397,7 +434,24 @@ def write(self, phases: dict):
397434
# Prevent recursion overruns
398435
sys.setrecursionlimit(25000)
399436

437+
######################################################################################
400438
# Write individual rank files
401-
for rank_phases_double in self.__rank_phases.items():
402-
file_name = self._json_writer(rank_phases_double)
403-
self.__logger.info(f"Wrote {file_name}")
439+
# try:
440+
# self.__write_with_standard_mp()
441+
# except:
442+
# print("There was an error running the standard MP function.")
443+
444+
# try:
445+
# self.__write_with_concurrency()
446+
# except:
447+
# print("There was an error with self.__write_with_concurrency")
448+
449+
try:
450+
self.__write_with_thread_concurrency()
451+
except:
452+
print("There was an error with self.__write_with_thread_concurrency")
453+
454+
try:
455+
self.__write_in_serial()
456+
except:
457+
print("There was an error with self.__write_in_serial")

src/lbaf/Utils/lbsTimerDecorator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def wrapper(self, *args, **kwargs):
1414

1515
# Get a logger instance (stacklevel=2 uses the logger from the function being decorated)
1616
logger = logging.getLogger(self.__class__.__module__)
17-
logger.debug(f"{method.__name__}: {dur:.4f} s", stacklevel=2)
17+
logger.info(f"{method.__name__}: {dur:.4f} s", stacklevel=2)
1818

1919
return res
2020
return wrapper

0 commit comments

Comments
 (0)