Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion communication_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def beam_2_buffer(beam, mode='pickle', verbose=False):
np.array([beam.mass]),
np.array([beam.circumference]),
np.array([beam.gamma]),
np.atleast_1d(np.float_(beam.id)),
np.atleast_1d(np.float64(beam.id)),
beam.x, beam.xp, beam.y, beam.yp, beam.z, beam.dp,
np.array([float(len(sinfo_float_buf))]),sinfo_float_buf)), dtype=np.float64)

Expand Down
65 changes: 45 additions & 20 deletions multiprocexec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import os, sys
import importlib
from . import parse_sim_class_string as psc
import traceback
import time

class mpComm(object):
def __init__(self, pid, N_proc, queue_list,
Expand Down Expand Up @@ -60,31 +62,35 @@ def Bcast(self, buf, root):

def todo(sim_module_string, pid, N_proc, queue_list,
mutex, barriex, turnstile, turnstile2, cnt, multiturn):
try:
comm = mpComm(pid, N_proc, queue_list,
mutex, barriex, turnstile, turnstile2, cnt)

comm = mpComm(pid, N_proc, queue_list,
mutex, barriex, turnstile, turnstile2, cnt)
BIN = os.path.expanduser("./")
sys.path.append(BIN)

BIN = os.path.expanduser("./")
sys.path.append(BIN)
# if len(sim_module_strings)!=2:
# raise(ValueError('\n\nsim_class must be given in the form: module.class.\nNested referencing not implemented.\n\n'))

# if len(sim_module_strings)!=2:
# raise(ValueError('\n\nsim_class must be given in the form: module.class.\nNested referencing not implemented.\n\n'))
module_name, class_name, dict_kwargs = psc.parse_sim_class_string(
sim_module_string)

module_name, class_name, dict_kwargs = psc.parse_sim_class_string(
sim_module_string)
SimModule = importlib.import_module(module_name)
SimClass = getattr(SimModule, class_name)

SimModule = importlib.import_module(module_name)
SimClass = getattr(SimModule, class_name)

simulation_content = SimClass(**dict_kwargs)
if multiturn:
from PyPARIS.ring_of_CPUs_multiturn import RingOfCPUs_multiturn
myCPUring = RingOfCPUs_multiturn(simulation_content, comm=comm)
myCPUring.run()
else:
from PyPARIS.ring_of_CPUs import RingOfCPUs
myCPUring = RingOfCPUs(simulation_content, comm=comm)
myCPUring.run()
simulation_content = SimClass(**dict_kwargs)
if multiturn:
from PyPARIS.ring_of_CPUs_multiturn import RingOfCPUs_multiturn
myCPUring = RingOfCPUs_multiturn(simulation_content, comm=comm)
myCPUring.run()
else:
from PyPARIS.ring_of_CPUs import RingOfCPUs
myCPUring = RingOfCPUs(simulation_content, comm=comm)
myCPUring.run()
except Exception as e:
print(f"[ERROR] An exception occurred: {e}", file=sys.stderr)
traceback.print_exc()
os._exit(1)


if __name__=='__main__':
Expand Down Expand Up @@ -119,6 +125,25 @@ def todo(sim_module_string, pid, N_proc, queue_list,
mutex, barrier, turnstile, turnstile2, cnt, multiturn)))
for p in proc_list:
p.start()

sim_start_time = time.time()
# Thread monitor
while True:
alive = [p.is_alive() for p in proc_list]
exitcodes = [p.exitcode for p in proc_list]
if any(code is not None and code != 0 for code in exitcodes):
print("[Main] ERROR: One or more processes failed. Terminating all...", file=sys.stderr)
for p in proc_list:
if p.is_alive():
p.terminate()
sys.exit(1)
if not any(alive): # all finished
break
if time.time() - sim_start_time > 300: # Stop checks if simulation has ran successfully for more than 5 minutes
break

time.sleep(5) # avoid busy loop, check every 5 seconds

for p in proc_list:
p.join()